This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d47c3ea fix bug in setting the record sequence in Pulsar source and sink (#1905) d47c3ea is described below commit d47c3ea8bebc10659949e6f8c76cf41ad9551105 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Jun 4 16:01:44 2018 -0700 fix bug in setting the record sequence in Pulsar source and sink (#1905) * fix bug in setting the record sequence in Pulsar source and sink * removing unnecessary file --- .../src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java | 2 +- .../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index d1d89da..3684e86 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -34,7 +34,7 @@ import org.apache.pulsar.io.core.Record; public class PulsarRecord<T> implements Record<T> { private String partitionId; - private Long sequenceId; + private long recordSequence; private T value; private MessageId messageId; private String topicName; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index f3a9805..adc28a1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -31,6 +31,7 @@ import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.FunctionConfig; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.Record; import org.apache.pulsar.io.core.Source; import org.jboss.util.Classes; @@ -110,7 +111,7 @@ public class PulsarSource<T> implements Source<T> { .value(input) .messageId(message.getMessageId()) .partitionId(String.format("%s-%s", topicName, partitionId)) - .sequenceId(message.getSequenceId()) + .recordSequence(Utils.getSequenceId(message.getMessageId())) .topicName(topicName) .ackFunction(() -> { if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { -- To stop receiving notification emails like this one, please contact si...@apache.org.