2020-06-10 10:49:06 UTC - Gautam Lodhiya: Hi, How can I pause and resume 
consumer messages with websocket api?
----
2020-06-10 11:38:40 UTC - alex kurtser: Hi @Sijie Guo

Sorry for disturbing . Do you have an advise for us? Is our design wrong and 
needed to be changed ib order to make function metrics working as expected ?
----
2020-06-10 13:15:55 UTC - Ebere Abanonu: If you can find a way not to send Flow 
command to Pulsar server
----
2020-06-10 14:28:41 UTC - Aaron Batilo: I had asked the same question in 
<#CJ0FMGHSM|kubernetes>  and it looks like they will be!
----
2020-06-10 14:28:42 UTC - Aaron Batilo: 
<https://apache-pulsar.slack.com/archives/CJ0FMGHSM/p1591723600263200>
----
2020-06-10 14:29:42 UTC - slouie: This sounds like the issue I’m seeing and 
posted about here. 
<https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1591375438262800>

I see logs reporting the backlogQuota was reached, yet the backlog message 
count and unack’d messages metrics do not increase. Was there more discovered 
about this problem?
----
2020-06-10 15:45:22 UTC - Sijie Guo: backlog isn’t really related to GC. 
Backlog indicates there are subscriptions not acknowledging messages. Did you 
capture `topics stats` and `topics stats-internal`?
----
2020-06-10 15:47:46 UTC - slouie: I did not get those two calls at the time. It 
took me a while to come back to this being a backlog issue because the metrics 
`pulsar_msg_backlog` and `pulsar_subscription_unacked_massages` remained 
steady. The only signal that seemed to make sense was `pulsar_storage_size`, it 
grew for a period and hit a ceiling
----
2020-06-10 15:47:48 UTC - Sijie Guo: Yes. It will be recorded.
----
2020-06-10 15:49:49 UTC - Sijie Guo: Yes. if you don’t ack messages, you will 
get backlog. So in any cases when you have issues on a topic, `topics stats` 
and `topics stats-internal` are the best tool for troubleshooting.
----
2020-06-10 15:50:12 UTC - slouie: shouldn’t the `msg_backlog` and `unacked` 
metrics increase when there is a problem with not properly ack’ing messages?
----
2020-06-10 15:50:54 UTC - Sijie Guo: it will stop increasing if you hit the 
backlog quota limit
----
2020-06-10 15:51:15 UTC - Sijie Guo: If I understand that correctly, you hit 
the backlog quota limit, no?
----
2020-06-10 15:53:30 UTC - slouie: Thats what the log messages state even though 
it doesn’t seem to add up when I look at the `msg_backlog` and `unacked`  
metric.

I had unack’ed messages hovering ~100 and the local backlog count holding at 
~50.

I can’t reproduce why storage size just increased to ~20GB and then hit the 
backlog quota. My backlog quota is the default 10G
----
2020-06-10 15:56:03 UTC - slouie: if I saw the `msg_backlog` and `unacked` 
metrics increase it would make total sense to me. I’m not grasping why those 
metrics remain consistent and then I suddenly hit a backlog_quota. It made me 
think it was something to do with retention and GC since the metrics indicate 
messages were still being ack’d.

`pulsar_rate_in` and `pulsar_rate_out`  were also constant until the limit was 
reached
----
2020-06-10 16:45:40 UTC - Erik Jansen: We’ve created a small test program with 
reads lines from a file and produces messages in batch with sendAsync. When 
file is completely produced we close producer with closeAsync.

We’ve created a reader on the topic. But for some reason the reader doesn’t 
receive all messages. Any thoughts i’ve we are doing something wrong?

These are the example programs:

import org.apache.pulsar.client.api.*;

import java.util.Date;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;

import <http://java.io|java.io>.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class CsvExample {
    /**
     * The initiated pulsar client, initiated in the runProducerExample method.
     */
    private PulsarClient client;

    /**
     * Constructor
     */
    CsvExample() {
        System.out.println(“Producer Example created”);
    }

    /**
     * Initiates the pulsar client and sends messages;
     *
     * @throws IOException
     */
    public void sendMessages(String topicName) throws IOException {
        Map&lt;String, Object&gt; config = getPulsarClientConfig();

        try {
            // Create the pulsar client with the config defined in
            client = PulsarClient.builder().loadConf(config).build();

            // Create a producer for the passed on topic
            Producer&lt;byte[]&gt; producer = createProducer(topicName);
            Date date = new Date();
            DateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss.SSS”);
            String stringDate = sdf.format(date);
            System.out.println(“Start sending messages at: ” + stringDate);

            Stream&lt;String&gt; lines = Files
                    
.lines(Paths.get(“/Users/erik/i-refactory-demo/tpc_h/deliveredData/Initial 
load/LINEITEM.tbl”));

            int numLines = 0;

            for (String line : (Iterable&lt;String&gt;) lines::iterator) {
                producer.sendAsync(line.getBytes());
                numLines++;

                if (numLines % 1000000 == 0) {
                    System.out.println(sdf.format(new Date()) + ” - num lines: 
” + numLines);
                }
            }

            System.out.println(“num lines: ” + numLines);

            lines.close();
            System.out.println(“Waiting for async ops to complete”);
            try {
                producer.closeAsync().get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                client.closeAsync().get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            date = new Date();
            sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss.SSS”);
            stringDate = sdf.format(date);
            System.out.println(“All operations completed at: ” + stringDate);

        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * Builds the producer.
     *
     * @link 
<https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerBuilder.html>
     *       for the building methods which can be called to configure the 
producer.
     *
     * @param topicName The name of the topic to which the producer messages’ 
should
     *                  be send too.
     *
     * @return A byte producer.
     *
     * @throws PulsarClientException A checked exception which should be 
catched.
     */
    private Producer&lt;byte[]&gt; createProducer(String topicName) throws 
PulsarClientException {
        // Producer&lt;String&gt; producer =
        // client.newProducer(Schema.STRING).topic(topicName)
        Producer&lt;byte[]&gt; producer = client.newProducer().topic(topicName)
                .batchingMaxPublishDelay(5000, TimeUnit.MILLISECONDS)
                
.enableBatching(true).batchingMaxMessages(500).batchingMaxBytes(4000000).blockIfQueueFull(true)
                .compressionType(CompressionType.LZ4).create();

        return producer;
    }

    /**
     * Get the configuration for the pulsar client.
     *
     * @link <https://pulsar.apache.org/docs/en/client-libraries-java/#client> 
(for
     *       configuration values)
     *
     * @return A map containing the configuration values for the client.
     */
    private Map&lt;String, Object&gt; getPulsarClientConfig() {
        Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();

        config.put(“serviceUrl”, “<pulsar://localhost:6650>”);
        config.put(“numIoThreads”, 3);
        config.put(“numListenerThreads”, 3);

        return config;
    }

    public static void main(String[] args) throws IOException {
        CsvExample producer = new CsvExample();
        producer.sendMessages(“public/default/line-item-12”);
    }
}

import org.apache.pulsar.client.api.*;

public class ReaderExample {
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(“<pulsar://localhost:6650>”).build();

        // Class regionClass = Region.class;
        // Schema schema = Schema.AVRO(regionClass);
        // Reader&lt;Region&gt; regionReader =
        // 
pulsarClient.newReader().topic(“region”).startMessageId(MessageId.earliest).create();

        Reader&lt;byte[]&gt; reader = 
pulsarClient.newReader().topic(“public/default/line-item-12")
                .startMessageId(MessageId.earliest).create();

            int numLines = 0;
            while (true) {
            // Message&lt;Region&gt; message = regionReader.readNext();
            Message&lt;byte[]&gt; message = reader.readNext();
            numLines++;
            if (numLines % 1000000 == 0) {
                System.out.println(” - num lines: ” + numLines);
            }

            if (numLines &gt; 6000000) {
                System.out.println(” - num lines: ” + numLines);

            }

        }
    }

}
----
2020-06-10 16:48:38 UTC - Matteo Merli: When you call `sendAsync()` you need to 
check the future returned for the successful completion of the operation.

In this case, the problem is that, by default, when the producer outgoing queue 
is full, it will reject immediately new send request.

This is done so that, by default, `sendAsync()` never blocks.

If you want to send as fast as possible, you should instead blocking when the 
queue is full. For that use `blockIfQueueFull=true`  in the producer builder.
----
2020-06-10 16:57:09 UTC - Sijie Guo: Interesting - do you set retention?
----
2020-06-10 17:13:43 UTC - slouie: yup. `defaultRetentionTimeInMinutes: 0` and 
`defaultRetentionSizeInMB: 0`. Which as I understand is the most aggressive 
retention policy one could set
----
2020-06-10 17:16:05 UTC - slouie: I see `2.4.0` has a metric specifically 
around `backlog_size` which should be helpful. I’m just at a loss on how to 
reproduce and be alerted to this since alerts based off `msg_backlog` and 
`unacked` wouldn’t work. I guess alerting on `storage_size` is a proxy for the 
problem, but that lacks precision in `2.3`
----
2020-06-10 17:18:52 UTC - Devin G. Bost: Here’s my latest Pulsar video: 
<https://www.youtube.com/watch?v=vlU9UegYab8&amp;feature=youtu.be>
clap : Karthik Ramasamy, Damien Burke
----
2020-06-10 18:33:37 UTC - Sijie Guo: I am not sure as well. without the 
information of `topics stats`  or `topics stats-internal`, it is really hard to 
debug this issue.
----
2020-06-10 19:09:52 UTC - Fred George: A slightly random (and not particularly 
important) question: where did the name 'Pulsar' come from? Some one asked me 
and I wondered if there was a story behind the name??
----
2020-06-10 19:10:38 UTC - Matteo Merli: The logo has a hint
----
2020-06-10 19:10:47 UTC - Matteo Merli: 
----
2020-06-10 19:39:40 UTC - Erik Jansen: Hi Merlimat, blockIfQueuefull is set to 
true.
----
2020-06-10 19:40:08 UTC - Fred George: ian curtis fan maybe?
----
2020-06-10 19:48:49 UTC - Marcio Martins: Which version would guys you 
recommend running successfully in production? I am having a hard time finding a 
version that is stable. 2.5.2 has problems with s3 offload and cannot do 
hardware accelerated CRC32C... 2.5.1 spams exceptions after a while, should I 
keep going to earlier versions?
----
2020-06-10 20:05:06 UTC - Sijie Guo: I don’t think you should go back to 
earlier version.

Can you share more details on the s3 offload and CRC32C issue?
----
2020-06-10 20:14:11 UTC - Damien Burke: Re pulsar functions, can anyone share 
any documentation/best practises around handling failure? The function 
interface declares it throws a checked Exception;
```O process(I input, Context context) throws Exception```
, but zero JavaDoc. I was anticipating there may be a retry or error / dead 
letter queue configurable? but again, cant find any related docs.. Thanks
----
2020-06-10 20:21:49 UTC - Marcio Martins: It fails to load the class
----
2020-06-10 20:21:53 UTC - Marcio Martins: One sec
----
2020-06-10 20:22:21 UTC - Marcio Martins: ```ERROR 
org.apache.bookkeeper.proto.checksum.CRC32CDigestManager - Sse42Crc32C is not 
supported, will use a slower CRC32C implementation.```

----
2020-06-10 20:22:31 UTC - Damien Burke: To provide an example, i need an 
enricher function, which depends on making a REST call. This REST call could of 
course fail due to a simple (temp) network issue, eg. Does the "framework" give 
me anything to support handling this? I could have some local retry logic to 
call REST api again. I know I could have logic to not write to the output topic 
if REST call fails. Is that kinda it?
----
2020-06-10 20:23:13 UTC - Marcio Martins: It also fails to load the S3 
offloader with this error:
```Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint 
must be set if s3 offload enabled```
Even though those requirements are met, it's the exact same config I use for 
2.5.1
----
2020-06-10 20:25:51 UTC - slouie: Is there something specific you would be 
looking for in `topic stats` and `topics stats-internal`?
----
2020-06-10 22:51:52 UTC - Sijie Guo: In stats-internal,  check the read and 
deleted positions of cursors. And check individual deletes
----
2020-06-11 00:16:51 UTC - Yuya Ebihara: @Yuya Ebihara has joined the channel
----
2020-06-11 00:31:55 UTC - Matteo Merli: @Marcio Martins Where do you get the 
CRC error? Linux or Mac?
----
2020-06-11 00:41:41 UTC - Sijie Guo: @Penghui Li can you help with the S3 
offloader error? I think it is related to per namespace offloading settings.
----
2020-06-11 03:36:31 UTC - Alexander Ursu: Hi, was wondering if anyone here who 
has used the Pulsar SQL Presto implementation extensively have any 
recommendations on hardware requirements for the Presto coordinator running 
alone, or possible with one or two workers.
----
2020-06-11 06:45:27 UTC - Penghui Li: There is a break change in 2.5.2 related 
to namespace level offloader policy and broker level policy, we already fix it 
on the master branch. Now we can get around by setting namespace level 
offloader policy.
----
2020-06-11 06:59:15 UTC - Marcio Martins: @Matteo Merli Linux, it's the 
official pulsar docker images.
----
2020-06-11 07:00:45 UTC - Marcio Martins: @Penghui Li Thanks!
----
2020-06-11 08:30:19 UTC - jujugrrr: Any luck @Marcio Martins?
----
2020-06-11 09:07:55 UTC - Marcio Martins: No, I just added S3 permission to the 
node roles, and that's working - not optimal but also not worth stressing over, 
in my case. Will revisit later and hopefully it will be working as newer 
versions get picked up.
----
2020-06-11 09:10:23 UTC - Ken Huang: I create a function in k8s environment. I 
can see a pod is active. But I found the function doesn't subscribe to the 
input topic

I modified 
<https://github.com/apache/pulsar/blob/master/deployment/kubernetes/helm/pulsar/templates/broker-configmap.yaml#L64|jobNamespace>
 , 
<https://github.com/apache/pulsar/blob/master/deployment/kubernetes/helm/pulsar/templates/broker-configmap.yaml#L68|serviceUrl>
  and adminUrl, I set the URL to proxy url.

here is the function pod log
```"Downloaded successfully"
shardId=0
[2020-06-11 08:29:11 +0000] [INFO] python_instance_main.py: Starting Python 
instance with Namespace(client_auth_params='file:///etc/auth/token', 
client_auth_plugin='org.apache.pulsar.client.impl.auth.AuthenticationToken', 
cluster_name='pulsar-145', dependency_repository=None, 
expected_healthcheck_interval=-1, extra_dependency_repository=None, 
function_details='{"tenant":"public","namespace":"default","name":"ken","className":"test_func.ExclamationFunction1","runtime":"PYTHON","autoAck":true,"parallelism":1,"source":{"inputSpecs":{"<persistent://public/default/intput>":{}},"cleanupSubscription":true},"sink":{"topic":"<persistent://public/default/output>"},"resources":{"cpu":1.1,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}',
 function_id='dd305aeb-a773-45d9-aa48-765c4fa451e5', 
function_version='fff11148-ed0d-4a65-b79f-8c6b7182ac88', 
hostname_verification_enabled='false', install_usercode_dependencies=True, 
instance_id='0', 
logging_config_file='/pulsar/conf/functions-logging/console_logging_config.ini',
 logging_directory='logs/functions', logging_file='ken', 
max_buffered_tuples='1024', metrics_port=9094, port=9093, 
pulsar_serviceurl='pulsar://&lt;my-proxy-url&gt;/', py='/pulsar/test_func.py', 
secrets_provider='secretsprovider.ClearTextSecretsProvider', 
secrets_provider_config=None, state_storage_serviceurl=None, 
tls_allow_insecure_connection='false', tls_trust_cert_path=None, 
use_tls='false')
2020-06-11 08:29:11.916 INFO  Client:88 | Subscribing on Topic 
:<persistent://public/default/intput>
2020-06-11 08:29:11.916 INFO  ConnectionPool:85 | Created connection for 
pulsar://&lt;my-proxy-url&gt;/
2020-06-11 08:29:11.918 INFO  ClientConnection:330 | [10.244.166.160:35006 
-&gt; &lt;my-proxy-url&gt;] Connected to broker
2020-06-11 08:29:12.312 INFO  HandlerBase:53 | 
[<persistent://public/default/intput>, public/default/ken, 0] Getting 
connection from pool
2020-06-11 08:29:12.350 INFO  ConnectionPool:85 | Created connection for 
<pulsar://pulsar-145-broker-0.pulsar-145-broker.pulsar.svc.cluster.local:6650>
2020-06-11 08:29:12.351 INFO  ClientConnection:332 | [10.244.166.160:35022 
-&gt; &lt;my-proxy-url&gt;] Connected to broker through proxy. Logical broker: 
<pulsar://pulsar-145-broker-0.pulsar-145-broker.pulsar.svc.cluster.local:6650>
2020-06-11 08:29:12.439 INFO  ConsumerImpl:175 | 
[<persistent://public/default/intput>, public/default/ken, 0] Created consumer 
on broker [10.244.166.160:35022 -&gt; &lt;my-proxy-url&gt;]
[2020-06-11 08:29:12 +0000] [INFO] server.py: Serving InstanceCommunication on 
port 9093
2020-06-11 08:39:12.313 INFO  ConsumerStatsImpl:64 | Consumer 
[<persistent://public/default/intput>, public/default/ken, 0] , 
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, 
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, 
totalAckedMsgMap_ = {})
2020-06-11 08:49:12.313 INFO  ConsumerStatsImpl:64 | Consumer 
[<persistent://public/default/intput>, public/default/ken, 0] , 
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, 
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, 
totalAckedMsgMap_ = {})```
----

Reply via email to