Hello again,

Attached is a simplified reproduction (without the ShellSpout, but the concepts 
are the same).


It seems that ack() and nextTuple() are always called on the same thread. That 
means that there is an inherent tradeoff.

Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize alot 
of nextTuple messages)

or nextTuple can sleep but then the ack is delayed.


Is there a way around this limitation?


Itai

________________________________
From: Itai Frenkel <[email protected]>
Sent: Thursday, July 17, 2014 9:42 PM
To: [email protected]
Subject: Acking is delayed by 5 seconds (in disruptor queue ?)

Hello,

I have noticed that an ack takes 5 seconds to pass from the bolt to the spout 
(see debug log below). It is a simple topology with 1 spout, 1 bolt and 1 acker 
all running on the same worker. The spout and the bolt are ShellSpout and 
ShellBolt respectively.

It looks like the message is delayed in the LMAX disruptor? queue.
How can I reduce this delay to ~1ms ?

Regards,
Itai


2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to tuple 
2759481868963667531
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack 
[-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to tuple 
2759481868963667531
2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message source: 
bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker __ack_ack 
[-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message source: 
__acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138



/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.google.common.collect.Maps;
import jline.internal.Log;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * This topology demonstrates Storm's #nextTuple sleep delay on ack latency
 */
public class AckDelayedByNextTupleTopology {


    public static class MockQueueConsumerSpout extends BaseRichSpout {
        private SpoutOutputCollector _collector;
        private final AtomicInteger _lastId = new AtomicInteger(0);
        private final Map<Integer,Long> startTimestampPerId = Maps.newConcurrentMap();

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
        }

        @Override
        public void nextTuple() {
            final int messageId = mockRemoveMessageFromQueue(5000);
            _collector.emit(new Values(""), messageId);
            startLatencyRecording(messageId);

            final String threadId = "[Thread-"+Thread.currentThread().getId()+"]";
            Log.info(threadId + "emit messageId=" + messageId);
        }

        @Override
        public void ack(Object id) {
            final String threadId = "[Thread-"+Thread.currentThread().getId()+"]";
            final long latency = stopLatencyRecording(id);
            Log.info(threadId + "ack messageId=" + id+ " latency="+ latency + "ms");
        }

        @Override
        public void fail(Object id) {
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        private void startLatencyRecording(int messageId) {
            startTimestampPerId.put(messageId, System.currentTimeMillis());
        }

        private long stopLatencyRecording(Object id) {
            return System.currentTimeMillis() - startTimestampPerId.remove(id);
        }

        private int mockRemoveMessageFromQueue(long waitTimeoutMilliseconds) {
            final int messageId = _lastId.getAndIncrement();
            if (messageId % 4 == 0) {
                //message arrived late to queue
                Utils.sleep(waitTimeoutMilliseconds);
            }
            return messageId;
        }
    }

  public static class NopBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new MockQueueConsumerSpout(), 1);
    builder.setBolt("count", new NopBolt(), 1).localOrShuffleGrouping("spout");

    Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(1);
    conf.setNumAckers(1);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", conf, builder.createTopology());

    Thread.sleep(60*1000);
    cluster.shutdown();

  }
}

Reply via email to