Hello,
We can use Future feature as said by JanecekPetr. Please check below
Implementation, you can use it and port your business logics.
- BoltTimeout Interface
- TimeoutImpl - which will call the method using Future and cancel the
task based on the timeout given
- BoltExecutor - which implements the BoltTimeout and you need to port
your code here.
- SampleBolt - In this bolt's execute method you need to call
TimeoutImpl's timedCall() method
BoltTimeout
public interface BoltTimeout <T> {
T call(Tuple tuple) throws InterruptedException, TimeoutException;
}
BoltExecutor
public class BoltExecutor implements BoltTimeout
{
@Override
public Object call(Tuple input) throws InterruptedException, TimeoutException
{
//Your buisness logic. i.e., The code which you are going to write in
execute() part of bolt
}
}
TimeoutImpl
public class TimeoutImpl
{
private final ExecutorService callTimeoutPool;
private final long callTimeout;
private static final Logger LOGGER =
Logger.getLogger(TimeoutImpl.class.getName());
public TimeoutImpl(ExecutorService callTimeoutPool, long callTimeout) {
this.callTimeoutPool = callTimeoutPool;
this.callTimeout = callTimeout;
}
public <T> T timedCall(final BoltTimeout<T> boltTimeout, Tuple tuple)
{
Future<T> future = callTimeoutPool.submit(new Callable<T>() {
@Override
public T call() throws InterruptedException, TimeoutException {
return boltTimeout.call(tuple);
}
});
try
{
if (callTimeout > 0)
{
return future.get(callTimeout, TimeUnit.MILLISECONDS);
}
else
{
return future.get();
}
}
catch (TimeoutException te)
{
future.cancel(true);
}
catch (ExecutionException ee){}
catch (InterruptedException ie){}
}
}
SampleBolt
public class SampleBolt implements IRichBolt
{
private OutputCollector collector;
@Override
public void execute(Tuple input)
{
try
{
long callTimeout = 1L;
LinkedBlockingQueue queue = new LinkedBlockingQueue();
ThreadPoolExecutor callTimeoutPool = new ThreadPoolExecutor(10, 10,
callTimeout , TimeUnit.MILLISECONDS, queue);
BoltTimeout timeout = new BoltExecutor();
TimeoutImpl boltTimeout = new TimeoutImpl(callTimeoutPool, callTimeout);
boltTimeout.timedCall(timeout, input);
}
catch (Exception ie)
{
collector.fail(input);
}
collector.ack(input);
}
}
Hope, this will be helpful to you to resolve the problem statement.
With regards,
Gowtham S, MCA
PH: 9597000673
On Mon, 15 Jul 2019 at 18:30, Petr Janeček <[email protected]> wrote:
> Hello,
> there's no such option as of now. If you want to suggest it as a future
> feature, please raise a Jira.
> There's a plethora of ways to do what you want to do, you can e.g.
> constrain your own code by using e.g. a Guava TimeLimiter (
> https://guava.dev/releases/snapshot/api/docs/com/google/common/util/concurrent/TimeLimiter.html).
> Or you can create a decorator Bolt, TimeLimitedBolt, wrapping your existing
> bolt and forcing its execute() method to a time limit.
> Either way, you must make sure the long-running operation responds
> correctly to thread interruption.
>
> PJ
>
>
> ---------- Původní e-mail ----------
> Od: Thomas Julian <[email protected]>
> Komu: user <[email protected]>
> Datum: 15. 7. 2019 11:12:24
> Předmět: Request for configuration to set a processing timeout value for a
> Bolt
>
> Hello,
>
> We want to set a processing time limit for a bolt based on a
> configuration.
> Are there any default configurations available in Storm to timeout a bolt?
>
> Thanks & Regards,
> Julian
>
>
>