This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch remove-base64 in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 93f38332423294acbd53b3d284e5b6c4e071b139 Author: Wu Sheng <[email protected]> AuthorDate: Wed Mar 16 11:28:27 2022 +0800 Remove hard requirement of BASE64 encoding for binary field --- CHANGES.md | 1 + .../oap/server/core/alarm/AlarmRecord.java | 18 +++------- .../analysis/manual/log/AbstractLogRecord.java | 18 ++-------- .../analysis/manual/segment/SegmentRecord.java | 23 ++++--------- .../manual/errorlog/BrowserErrorLogRecord.java | 18 ++-------- .../core/profile/ProfileThreadSnapshotRecord.java | 7 +--- .../server/core/storage/type/Convert2Entity.java | 11 ++++++ .../server/core/storage/type/Convert2Storage.java | 18 ++++++++++ .../server/core/storage/type/HashMapConverter.java | 39 ++++++++++++++++++++++ .../plugin/jdbc/HashMapConverterWrapper.java | 13 ++++++-- .../storage/plugin/zipkin/ZipkinSpanRecord.java | 17 ++-------- 11 files changed, 99 insertions(+), 84 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9d3531b..17ed1d2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -114,6 +114,7 @@ Release Notes. * Refactor the core Builder mechanism, new storage plugin could implement their own converter and get rid of hard requirement of using HashMap to communicate between data object and database native structure. * [Breaking Change] Break all existing 3rd-party storage extensions. +* Remove hard requirement of BASE64 encoding for binary field. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java index a65f22f..50422e0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.core.alarm; -import java.util.Base64; import java.util.List; import lombok.Getter; import lombok.Setter; @@ -32,9 +31,8 @@ import org.apache.skywalking.oap.server.core.source.ScopeDeclaration; import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; +import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; -import org.apache.skywalking.oap.server.library.util.CollectionUtils; -import org.apache.skywalking.oap.server.library.util.StringUtil; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM; @@ -94,12 +92,8 @@ public class AlarmRecord extends Record { record.setStartTime(((Number) converter.get(START_TIME)).longValue()); record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue()); record.setRuleName((String) converter.get(RULE_NAME)); - if (StringUtil.isEmpty((String) converter.get(TAGS_RAW_DATA))) { - record.setTagsRawData(new byte[] {}); - } else { - // Don't read the tags as they has been in the data binary already. - record.setTagsRawData(Base64.getDecoder().decode((String) converter.get(TAGS_RAW_DATA))); - } + record.setTagsRawData(converter.getWith(TAGS_RAW_DATA, new HashMapConverter.ToEntity.Base64Decoder())); + // Don't read the TAGS as they are only for query. return record; } @@ -113,11 +107,7 @@ public class AlarmRecord extends Record { converter.accept(START_TIME, storageData.getStartTime()); converter.accept(TIME_BUCKET, storageData.getTimeBucket()); converter.accept(RULE_NAME, storageData.getRuleName()); - if (CollectionUtils.isEmpty(storageData.getTagsRawData())) { - converter.accept(TAGS_RAW_DATA, Const.EMPTY_STRING); - } else { - converter.accept(TAGS_RAW_DATA, new String(Base64.getEncoder().encode(storageData.getTagsRawData()))); - } + converter.accept(TAGS_RAW_DATA, storageData.getTagsRawData()); converter.accept(TAGS, storageData.getTagsInString()); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/AbstractLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/AbstractLogRecord.java index d2c0c58..4140e96 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/AbstractLogRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/AbstractLogRecord.java @@ -18,11 +18,9 @@ package org.apache.skywalking.oap.server.core.analysis.manual.log; -import java.util.Base64; import java.util.List; import lombok.Getter; import lombok.Setter; -import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.analysis.record.Record; @@ -30,9 +28,8 @@ import org.apache.skywalking.oap.server.core.query.type.ContentType; import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; +import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; -import org.apache.skywalking.oap.server.library.util.CollectionUtils; -import org.apache.skywalking.oap.server.library.util.StringUtil; public abstract class AbstractLogRecord extends Record { @@ -121,12 +118,7 @@ public abstract class AbstractLogRecord extends Record { record.setContentType(((Number) converter.get(CONTENT_TYPE)).intValue()); record.setContent((String) converter.get(CONTENT)); record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue()); - if (StringUtil.isEmpty((String) converter.get(TAGS_RAW_DATA))) { - record.setTagsRawData(new byte[] {}); - } else { - // Don't read the tags as they has been in the data binary already. - record.setTagsRawData(Base64.getDecoder().decode((String) converter.get(TAGS_RAW_DATA))); - } + record.setTagsRawData(converter.getWith(TAGS_RAW_DATA, new HashMapConverter.ToEntity.Base64Decoder())); record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue()); } @@ -141,11 +133,7 @@ public abstract class AbstractLogRecord extends Record { converter.accept(CONTENT_TYPE, record.getContentType()); converter.accept(CONTENT, record.getContent()); converter.accept(TIMESTAMP, record.getTimestamp()); - if (CollectionUtils.isEmpty(record.getTagsRawData())) { - converter.accept(TAGS_RAW_DATA, Const.EMPTY_STRING); - } else { - converter.accept(TAGS_RAW_DATA, new String(Base64.getEncoder().encode(record.getTagsRawData()))); - } + converter.accept(TAGS_RAW_DATA, record.getTagsRawData()); converter.accept(TAGS, record.getTagsInString()); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java index 6394762..8a7fe83 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java @@ -18,15 +18,9 @@ package org.apache.skywalking.oap.server.core.analysis.manual.segment; -import java.util.Base64; import java.util.List; import lombok.Getter; import lombok.Setter; -import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; -import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; -import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; -import org.apache.skywalking.oap.server.library.util.StringUtil; -import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.analysis.record.Record; @@ -34,7 +28,10 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset; -import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; +import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; +import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; +import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; @SuperDataset @Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class) @@ -118,11 +115,7 @@ public class SegmentRecord extends Record { record.setLatency(((Number) converter.get(LATENCY)).intValue()); record.setIsError(((Number) converter.get(IS_ERROR)).intValue()); record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue()); - if (StringUtil.isEmpty((String) converter.get(DATA_BINARY))) { - record.setDataBinary(new byte[] {}); - } else { - record.setDataBinary(Base64.getDecoder().decode((String) converter.get(DATA_BINARY))); - } + record.setDataBinary(converter.getWith(DATA_BINARY, new HashMapConverter.ToEntity.Base64Decoder())); // Don't read the tags as they have been in the data binary already. return record; } @@ -138,11 +131,7 @@ public class SegmentRecord extends Record { converter.accept(LATENCY, storageData.getLatency()); converter.accept(IS_ERROR, storageData.getIsError()); converter.accept(TIME_BUCKET, storageData.getTimeBucket()); - if (CollectionUtils.isEmpty(storageData.getDataBinary())) { - converter.accept(DATA_BINARY, Const.EMPTY_STRING); - } else { - converter.accept(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary()))); - } + converter.accept(DATA_BINARY, storageData.getDataBinary()); converter.accept(TAGS, storageData.getTags()); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java index 853cca1..6f0ecba 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java @@ -17,10 +17,8 @@ package org.apache.skywalking.oap.server.core.browser.manual.errorlog; -import java.util.Base64; import lombok.Getter; import lombok.Setter; -import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; @@ -29,9 +27,8 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset; import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; +import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; -import org.apache.skywalking.oap.server.library.util.CollectionUtils; -import org.apache.skywalking.oap.server.library.util.StringUtil; @SuperDataset @Stream(name = BrowserErrorLogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.BROWSER_ERROR_LOG, builder = BrowserErrorLogRecord.Builder.class, processor = RecordStreamProcessor.class) @@ -96,12 +93,7 @@ public class BrowserErrorLogRecord extends Record { record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue()); record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue()); record.setErrorCategory(((Number) converter.get(ERROR_CATEGORY)).intValue()); - String dataBinary = (String) converter.get(DATA_BINARY); - if (StringUtil.isEmpty(dataBinary)) { - record.setDataBinary(new byte[] {}); - } else { - record.setDataBinary(Base64.getDecoder().decode(dataBinary)); - } + record.setDataBinary(converter.getWith(DATA_BINARY, new HashMapConverter.ToEntity.Base64Decoder())); return record; } @@ -114,11 +106,7 @@ public class BrowserErrorLogRecord extends Record { converter.accept(TIMESTAMP, storageData.getTimestamp()); converter.accept(TIME_BUCKET, storageData.getTimeBucket()); converter.accept(ERROR_CATEGORY, storageData.getErrorCategory()); - if (CollectionUtils.isEmpty(storageData.getDataBinary())) { - converter.accept(DATA_BINARY, Const.EMPTY_STRING); - } else { - converter.accept(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary()))); - } + converter.accept(DATA_BINARY, storageData.getDataBinary()); } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileThreadSnapshotRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileThreadSnapshotRecord.java index ac977e5..926259d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileThreadSnapshotRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileThreadSnapshotRecord.java @@ -31,7 +31,6 @@ import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedInde import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; -import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT; @@ -95,11 +94,7 @@ public class ProfileThreadSnapshotRecord extends Record { converter.accept(DUMP_TIME, storageData.getDumpTime()); converter.accept(SEQUENCE, storageData.getSequence()); converter.accept(TIME_BUCKET, storageData.getTimeBucket()); - if (CollectionUtils.isEmpty(storageData.getStackBinary())) { - converter.accept(STACK_BINARY, Const.EMPTY_STRING); - } else { - converter.accept(STACK_BINARY, new String(Base64.getEncoder().encode(storageData.getStackBinary()))); - } + converter.accept(STACK_BINARY, new String(Base64.getEncoder().encode(storageData.getStackBinary()))); } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Entity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Entity.java index 29de446..4444d6e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Entity.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Entity.java @@ -18,9 +18,20 @@ package org.apache.skywalking.oap.server.core.storage.type; +import java.util.function.Function; + /** * A function supplier to convert raw data from database to object defined in OAP */ public interface Convert2Entity { Object get(String fieldName); + + /** + * Use the given type decoder to decode value of given field name. + * + * @param fieldName to read value + * @param typeDecoder to decode the value + * @return decoded value + */ + <T, R> R getWith(String fieldName, Function<T, R> typeDecoder); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Storage.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Storage.java index 0d0a3cd..2f92cbf 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Storage.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/Convert2Storage.java @@ -18,6 +18,8 @@ package org.apache.skywalking.oap.server.core.storage.type; +import java.util.List; + /** * A function supplier to accept key-value pair, and convert to the expected database structure according to storage * implementation. @@ -25,9 +27,25 @@ package org.apache.skywalking.oap.server.core.storage.type; * @param <R> Type of database required structure. */ public interface Convert2Storage<R> { + /** + * Accept general type key/value. + */ void accept(String fieldName, Object fieldValue); + /** + * Accept String key and byte array value. + */ + void accept(String fieldName, byte[] fieldValue); + + /** + * Accept String key and String list value. + */ + void accept(String fieldName, List<String> fieldValue); + Object get(String fieldName); + /** + * @return the converted data + */ R obtain(); } \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/HashMapConverter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/HashMapConverter.java index feba33a..f74af5f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/HashMapConverter.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/HashMapConverter.java @@ -18,9 +18,15 @@ package org.apache.skywalking.oap.server.core.storage.type; +import java.util.Base64; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; import lombok.RequiredArgsConstructor; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.library.util.StringUtil; public class HashMapConverter { /** @@ -34,6 +40,25 @@ public class HashMapConverter { public Object get(final String fieldName) { return source.get(fieldName); } + + @Override + public <T, R> R getWith(final String fieldName, final Function<T, R> typeDecoder) { + final T value = (T) source.get(fieldName); + return typeDecoder.apply(value); + } + + /** + * Default Base64Decoder supplier + */ + public static class Base64Decoder implements Function<String, byte[]> { + @Override + public byte[] apply(final String encodedStr) { + if (StringUtil.isEmpty(encodedStr)) { + return new byte[] {}; + } + return Base64.getDecoder().decode(encodedStr); + } + } } /** @@ -52,6 +77,20 @@ public class HashMapConverter { } @Override + public void accept(final String fieldName, final byte[] fieldValue) { + if (CollectionUtils.isEmpty(fieldValue)) { + source.put(fieldName, Const.EMPTY_STRING); + } else { + source.put(fieldName, new String(Base64.getEncoder().encode(fieldValue))); + } + } + + @Override + public void accept(final String fieldName, final List<String> fieldValue) { + this.accept(fieldName, (Object) fieldValue); + } + + @Override public Object get(final String fieldName) { return source.get(fieldName); } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/HashMapConverterWrapper.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/HashMapConverterWrapper.java index e95d602..11d24b2 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/HashMapConverterWrapper.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/HashMapConverterWrapper.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc; +import java.util.List; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; @@ -32,11 +33,17 @@ public class HashMapConverterWrapper { return new HashMapConverter.ToStorage() { @Override public void accept(final String fieldName, final Object fieldValue) { - if (fieldName.equals("tags")) { - return; - } origin.accept(fieldName, fieldValue); } + + /** + * Skip String list type column in SQL-style database. The values are processed by + * AbstractSearchTagBuilder#analysisSearchTag(List, Convert2Storage) and TAGS_RAW_DATA column + */ + @Override + public void accept(final String fieldName, final List<String> fieldValue) { + + } }; } } diff --git a/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java index e3f551d..95444d6 100644 --- a/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java +++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java @@ -18,11 +18,9 @@ package org.apache.skywalking.oap.server.storage.plugin.zipkin; -import java.util.Base64; import java.util.List; import lombok.Getter; import lombok.Setter; -import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; @@ -31,9 +29,8 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset; import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; +import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; -import org.apache.skywalking.oap.server.library.util.CollectionUtils; -import org.apache.skywalking.oap.server.library.util.StringUtil; @SuperDataset @Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class) @@ -126,11 +123,7 @@ public class ZipkinSpanRecord extends Record { record.setLatency(((Number) converter.get(LATENCY)).intValue()); record.setIsError(((Number) converter.get(IS_ERROR)).intValue()); record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue()); - if (StringUtil.isEmpty((String) converter.get(DATA_BINARY))) { - record.setDataBinary(new byte[] {}); - } else { - record.setDataBinary(Base64.getDecoder().decode((String) converter.get(DATA_BINARY))); - } + record.setDataBinary(converter.getWith(DATA_BINARY, new HashMapConverter.ToEntity.Base64Decoder())); record.setEncode(((Number) converter.get(ENCODE)).intValue()); // Don't read the tags as they have been in the data binary already. return record; @@ -149,11 +142,7 @@ public class ZipkinSpanRecord extends Record { converter.accept(LATENCY, storageData.getLatency()); converter.accept(IS_ERROR, storageData.getIsError()); converter.accept(TIME_BUCKET, storageData.getTimeBucket()); - if (CollectionUtils.isEmpty(storageData.getDataBinary())) { - converter.accept(DATA_BINARY, Const.EMPTY_STRING); - } else { - converter.accept(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary()))); - } + converter.accept(DATA_BINARY, storageData.getDataBinary()); converter.accept(ENCODE, storageData.getEncode()); converter.accept(TAGS, storageData.getTags()); }
