This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f8e7dfe Removed client-admin dependency from function-utils (#2739) f8e7dfe is described below commit f8e7dfe1aa4f93c1fb724d705b10f89264583c27 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Sat Oct 6 10:27:52 2018 -0700 Removed client-admin dependency from function-utils (#2739) --- .../apache/pulsar/functions/instance/Utils.java | 57 ++++++++++++++++++++++ .../pulsar/functions/source/PulsarRecord.java | 2 +- .../pulsar/functions/instance}/UtilsTest.java | 2 +- pulsar-functions/utils/pom.xml | 14 ++++-- .../pulsar/functions/utils/FunctionConfig.java | 2 - .../functions/utils/FunctionConfigUtils.java | 1 - .../apache/pulsar/functions/utils/Resources.java | 3 -- .../org/apache/pulsar/functions/utils/Utils.java | 25 ---------- .../validation/ConfigValidationAnnotations.java | 2 - .../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 7 ++- 10 files changed, 72 insertions(+), 43 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java new file mode 100644 index 0000000..7149bfe --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * Utils used for instance. + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class Utils { + + public static final long getSequenceId(MessageId messageId) { + MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl) + ? ((TopicMessageIdImpl) messageId).getInnerMessageId() + : messageId); + long ledgerId = msgId.getLedgerId(); + long entryId = msgId.getEntryId(); + + // Combine ledger id and entry id to form offset + // Use less than 32 bits to represent entry id since it will get + // rolled over way before overflowing the max int range + long offset = (ledgerId << 28) | entryId; + return offset; + } + + public static final MessageId getMessageId(long sequenceId) { + // Demultiplex ledgerId and entryId from offset + long ledgerId = sequenceId >>> 28; + long entryId = sequenceId & 0x0F_FF_FF_FFL; + + return new MessageIdImpl(ledgerId, entryId, -1); + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index 359f48e..dc5a08a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -29,7 +29,7 @@ import lombok.ToString; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.EncryptionContext; -import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.instance.Utils; @Builder @Getter diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java similarity index 97% rename from pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java index 511270a..ba83c89 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.utils; +package org.apache.pulsar.functions.instance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml index 0d7f941..6c6930f 100644 --- a/pulsar-functions/utils/pom.xml +++ b/pulsar-functions/utils/pom.xml @@ -34,14 +34,20 @@ <dependencies> <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-client-admin-original</artifactId> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-common</artifactId> <version>${project.version}</version> </dependency> <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io-core</artifactId> + <version>${project.version}</version> </dependency> <dependency> diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index d2b94e3..2e43edc 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -29,13 +29,11 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists; -import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index d546830..cf182a8 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -27,7 +27,6 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import java.util.HashMap; import java.util.Map; -import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.commons.lang.StringUtils.isNotBlank; import static org.apache.commons.lang.StringUtils.isNotEmpty; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java index 7e8a127..5c707fa 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java @@ -20,9 +20,6 @@ package org.apache.pulsar.functions.utils; import lombok.*; -import java.util.HashMap; -import java.util.Map; - @Getter @Setter @Data diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 7befc85..d35be61 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -29,9 +29,6 @@ import java.lang.reflect.Type; import java.net.ServerSocket; import java.util.Collection; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime; import org.apache.pulsar.io.core.Sink; @@ -57,28 +54,6 @@ public class Utils { public static String FILE = "file"; public static String BUILTIN = "builtin"; - public static final long getSequenceId(MessageId messageId) { - MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl) - ? ((TopicMessageIdImpl) messageId).getInnerMessageId() - : messageId); - long ledgerId = msgId.getLedgerId(); - long entryId = msgId.getEntryId(); - - // Combine ledger id and entry id to form offset - // Use less than 32 bits to represent entry id since it will get - // rolled over way before overflowing the max int range - long offset = (ledgerId << 28) | entryId; - return offset; - } - - public static final MessageId getMessageId(long sequenceId) { - // Demultiplex ledgerId and entryId from offset - long ledgerId = sequenceId >>> 28; - long entryId = sequenceId & 0x0F_FF_FF_FFL; - - return new MessageIdImpl(ledgerId, entryId, -1); - } - public static String printJson(MessageOrBuilder msg) throws IOException { return JsonFormat.printer().print(msg); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java index 08f0d66..d562404 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.functions.utils.validation; -import org.apache.pulsar.functions.utils.FunctionConfig; - import java.lang.annotation.ElementType; import java.lang.annotation.Repeatable; import java.lang.annotation.Retention; diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 91d043d..a4c589f 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -19,8 +19,6 @@ package org.apache.pulsar.io.jdbc; -import static jersey.repackaged.com.google.common.base.Preconditions.checkState; - import com.google.common.collect.Lists; import java.sql.Connection; import java.sql.PreparedStatement; @@ -139,8 +137,9 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> { if (log.isDebugEnabled()) { log.debug("Starting flush, queue size: {}", incomingList.size()); } - checkState(swapList.isEmpty(), - "swapList should be empty since last flush. swapList.size: " + swapList.size()); + if (!swapList.isEmpty()) { + throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size()); + } synchronized (incomingList) { List<Record<T>> tmpList;