This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d47c3ea  fix bug in setting the record sequence in Pulsar source and 
sink (#1905)
d47c3ea is described below

commit d47c3ea8bebc10659949e6f8c76cf41ad9551105
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Jun 4 16:01:44 2018 -0700

    fix bug in setting the record sequence in Pulsar source and sink (#1905)
    
    * fix bug in setting the record sequence in Pulsar source and sink
    
    * removing unnecessary file
---
 .../src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java | 2 +-
 .../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index d1d89da..3684e86 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.io.core.Record;
 public class PulsarRecord<T> implements Record<T> {
 
     private String partitionId;
-    private Long sequenceId;
+    private long recordSequence;
     private T value;
     private MessageId messageId;
     private String topicName;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f3a9805..adc28a1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Source;
 import org.jboss.util.Classes;
@@ -110,7 +111,7 @@ public class PulsarSource<T> implements Source<T> {
                 .value(input)
                 .messageId(message.getMessageId())
                 .partitionId(String.format("%s-%s", topicName, partitionId))
-                .sequenceId(message.getSequenceId())
+                .recordSequence(Utils.getSequenceId(message.getMessageId()))
                 .topicName(topicName)
                 .ackFunction(() -> {
                     if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to