Hey Stig, Thanks for looking into this and providing a fix. We have build the storm-kafka-client jar from your branch and it is working fine on that. We are still testing it and will let you know if there are any issues we fix.
I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>, you have put the fix version as 1.2.3, any approximate date when will this be released? Thanks for your help :) Best Regards Saurabh Kumar Mimani On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing <[email protected]> wrote: > I'm assuming you applied the fix on top of 1.2.1 or something like that? > The exception can't be thrown from the branch I linked, since it was > removed in an earlier commit in 1.x-branch. > > Your logs show that the committed offset for partition 6 is 1682098 (98 > for short). 98 was emitted, since it shows up in the emitted list. I'm > guessing it failed and was replayed. 99 and up are in the acked list, so > they are ready to commit as soon as 98 finishes processing. > > The log shows that 99 is the tuple encountering the exception, so I'm > guessing what happened is that 98 was acked and the spout decided to commit > 98, 99 etc. For some reason it then still decides to emit 99. The only > reasons I can think of (barring bugs in Kafka/the Kafka client) for that to > happen would be that 99 is in waitingToEmit and isn't being cleared out > during the commit (this is the bug I tried to fix), somehow 99 is still > queued for retry (this should not be possible) or for some reason the > consumer position ends up below the committed offset. I think the best bet > for tracking down why it happens would be logging the contents of the > RetryService, the contents of waitingToEmit and the consumer position both > after commitOffsetsForAckedTuples, and right before the exception is > thrown. > > Could you try logging those? I can add the log statements on top of the > bugfix if needed. > > Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani < > [email protected]>: > >> Hey, I see this is still happening, this time, it seems, as it seemed to >> me, because same offset from different partition was committed(guessing >> from logs), but not sure as that should be handled. >> >> Please find the logs here >> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>. >> >> >> >> Best Regards >> >> Saurabh Kumar Mimani >> >> >> >> >> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing <[email protected]> >> wrote: >> >>> I believe I have a fix, your logs were helpful. Please try out the >>> changes in https://github.com/apache/storm/pull/2923/files. >>> >>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani < >>> [email protected]>: >>> >>>> Hey, >>>> >>>> Thanks for looking into this. I was not able to produce this earlier on >>>> my local, however I will again try once. I was consistently able to >>>> reproduce it with parallelism of 5 for boltA and parallelism of 200 with >>>> boltB on 2 machines in cluster mode. >>>> >>>> I will try again with your code once. >>>> >>>> These <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are >>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode with >>>> my topology, in case these helps. >>>> >>>> >>>> >>>> Best Regards >>>> >>>> Saurabh Kumar Mimani >>>> >>>> >>>> >>>> >>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing < >>>> [email protected]> wrote: >>>> >>>>> I can't reproduce this. >>>>> >>>>> I created a test topology similar to the code you posted, based on the >>>>> 1.2.1 release tag >>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564 >>>>> . >>>>> >>>>> I set up a local Kafka instance and put enough messages to run the >>>>> topology for 15 minutes or so in the test topic. After populating the >>>>> topic, I started the topology and let it run until it reached the end of >>>>> the topic. As expected a lot of messages failed, but after a while it >>>>> managed to successfully process all messages. I didn't see any worker >>>>> crashes, and the logs only show some errors related to moving files >>>>> (expected on Windows). >>>>> >>>>> The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1, >>>>> though 1.1.1 is slower due to >>>>> https://issues.apache.org/jira/browse/STORM-3102. >>>>> >>>>> The Kafka setup was with the default configuration for both Kafka and >>>>> Zookeeper, and Storm was set up with a local Nimbus, single local >>>>> Supervisor and 4 worker slots. >>>>> >>>>> Saurabh please try to reproduce the issue using the topology I linked. >>>>> If you need to make adjustments in order to provoke the issue, please >>>>> update the code and link it so I can check it out and try it. >>>>> >>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < >>>>> [email protected]>: >>>>> >>>>>> No, I have checked that, there is no other consumer group consuming >>>>>> from the same. >>>>>> >>>>>> Thanks for looking into it, let me know if you need any >>>>>> other information. >>>>>> >>>>>> >>>>>> >>>>>> Best Regards >>>>>> >>>>>> Saurabh Kumar Mimani >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to >>>>>>> the input tuple. It's intended for bolts that just need to receive a >>>>>>> tuple, >>>>>>> synchronously process it and emit some new tuples anchored to the input >>>>>>> tuple. It's there because doing manual acking is tedious and >>>>>>> error-prone in >>>>>>> cases where you don't need to be able to e.g. emit new unachored tuples >>>>>>> or >>>>>>> ack the input tuple asynchronously. As Peter mentioned, the >>>>>>> BasicBoltExecutor ( >>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42) >>>>>>> handles acking for you. >>>>>>> >>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also check >>>>>>> that you don't have any other consumers using the same consumer group as >>>>>>> the spout. >>>>>>> >>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>>>>>> [email protected]>: >>>>>>> >>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an >>>>>>>> implmentation of IRichBolt), which calls collector.setContext(input), >>>>>>>> and >>>>>>>> also does the acking inside it's execute function, and in between >>>>>>>> calls the >>>>>>>> BaseBasicBolt.execute version (which takes the collector as well as the >>>>>>>> tuple as parameters). >>>>>>>> So the intention is clearly that it is automatically anchored and >>>>>>>> acknowledged. >>>>>>>> >>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Saurabh, >>>>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't do >>>>>>>>> much. >>>>>>>>> Also checked BasicOutputCollector and don't see how it will >>>>>>>>> anchor automatically unless you call >>>>>>>>> BasicOutputCollector.setContext(Tuple tuple), don't see all of your >>>>>>>>> code, >>>>>>>>> but don't see this call in your boltA code. >>>>>>>>> Also it looks like even when you make this setContext call, after >>>>>>>>> that you will have to emit using following emit function >>>>>>>>> >>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples) >>>>>>>>> >>>>>>>>> Basically just check that whatever emit function is called, it >>>>>>>>> does pass the input tuple. >>>>>>>>> >>>>>>>>> >>>>>>>>> Once I had exactly same issue for few days, but mine was related >>>>>>>>> to config. I wanted to read from two Kafka topics in one topology and >>>>>>>>> had >>>>>>>>> two different kafkaspout created, mistakenly I copy pasted same >>>>>>>>> config and >>>>>>>>> that caused this issue. Not sure if that applies to your scenario. >>>>>>>>> >>>>>>>>> >>>>>>>>> *NOTE*: I checked the latest storm master branch for code. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <[email protected] >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hey Ravi, >>>>>>>>>> >>>>>>>>>> I am using *BaseBasicBolt*, which as described here >>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html> >>>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically >>>>>>>>>> anchored to the input tuple, and the input tuple is acked for you >>>>>>>>>> automatically when the execute method completes. >>>>>>>>>> >>>>>>>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka >>>>>>>>>> spout I am using is from storm-kafka-client >>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1> >>>>>>>>>> library, so unique ID, etc should be already taken care of. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Saurabh, >>>>>>>>>>> I think there is issue with part of code which is emitting the >>>>>>>>>>> tuples. >>>>>>>>>>> >>>>>>>>>>> If you want to use ack mechanism, you need to use anchor tuple >>>>>>>>>>> when you emit from bolts. >>>>>>>>>>> >>>>>>>>>>> Collector.emit(Tuple input, Values data) >>>>>>>>>>> >>>>>>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Ravi >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani < >>>>>>>>>>> [email protected] wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your reply. What you are saying is correct, However >>>>>>>>>>>> I am able to reproduce it more often and I think it happens when >>>>>>>>>>>> multiple >>>>>>>>>>>> tuples gets failed in first run but all of those gets success on >>>>>>>>>>>> retry, >>>>>>>>>>>> something of that sort is happening. >>>>>>>>>>>> >>>>>>>>>>>> This can be reproduced using following two bolts and kafkaSpout >>>>>>>>>>>> easily, by running in cluster more with 3/4 minutes: >>>>>>>>>>>> >>>>>>>>>>>> *BoltA* >>>>>>>>>>>> >>>>>>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>>>>>> >>>>>>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>>>>>> >>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>> val inp = input.getBinaryByField("value").getObj[someObj] >>>>>>>>>>>> val randomGenerator = new Random() >>>>>>>>>>>> >>>>>>>>>>>> var i = 0 >>>>>>>>>>>> val rand = randomGenerator.nextBoolean() >>>>>>>>>>>> 1 to 100 foreach { >>>>>>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>>>>>> i += 1 >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> *BoltB* >>>>>>>>>>>> >>>>>>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>>>>>> >>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>> val abc = input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>>>>>> if(abc.index >= 97 && abc.rand){ >>>>>>>>>>>> println(s"throwing FailedException for ${abc.index}th tuple >>>>>>>>>>>> for") >>>>>>>>>>>> throw new FailedException() >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> *KafkaSpout:* >>>>>>>>>>>> >>>>>>>>>>>> private def getKafkaSpoutConfig(source: Config) = >>>>>>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", >>>>>>>>>>>> "queueName") >>>>>>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>>>>>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>>>>>> >>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>> >>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>> 10, >>>>>>>>>>>> >>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000) >>>>>>>>>>>> )) >>>>>>>>>>>> >>>>>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", >>>>>>>>>>>> "UNCOMMITTED_EARLIEST"))) >>>>>>>>>>>> >>>>>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", >>>>>>>>>>>> 10000)) >>>>>>>>>>>> .build() >>>>>>>>>>>> >>>>>>>>>>>> Other config: >>>>>>>>>>>> >>>>>>>>>>>> messageTimeoutInSecons: 300 >>>>>>>>>>>> >>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>> >>>>>>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 tuples >>>>>>>>>>>>> are acked. >>>>>>>>>>>>> >>>>>>>>>>>>> The error you're seeing was added as part of >>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to >>>>>>>>>>>>> avoid having that bug pop up again. Could you try posting your >>>>>>>>>>>>> spout >>>>>>>>>>>>> configuration? Also if possible, it would be helpful if you could >>>>>>>>>>>>> enable >>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout and >>>>>>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager. >>>>>>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>>>>>> ). It should be possible to track down when/why the consumer >>>>>>>>>>>>> position wound >>>>>>>>>>>>> up behind the committed offset. >>>>>>>>>>>>> >>>>>>>>>>>>> Just so you're aware, the check that crashes the spout has >>>>>>>>>>>>> been removed as of >>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. I'd still >>>>>>>>>>>>> like to know if there's a bug in the spout causing it to emit >>>>>>>>>>>>> tuples that >>>>>>>>>>>>> were already committed though. >>>>>>>>>>>>> >>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Version Info: >>>>>>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>>>>>> >>>>>>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>>>>>> >>>>>>>>>>>>>> boltA just does some formatting of requests and emits >>>>>>>>>>>>>> another tuple. boltB does some processing and emits around >>>>>>>>>>>>>> 100 tuples for each tuple being received. boltC and boltD >>>>>>>>>>>>>> processes >>>>>>>>>>>>>> these tuples. All the bolts implements BaseBasicBolt. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as >>>>>>>>>>>>>> fail and marks that for retry by throwing FailedException, >>>>>>>>>>>>>> After a few minutes less than my topology timeout, I get the >>>>>>>>>>>>>> following >>>>>>>>>>>>>> error: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message >>>>>>>>>>>>>> that has already been committed. This should never occur when >>>>>>>>>>>>>> using the at-least-once processing guarantee. >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message >>>>>>>>>>>>>> that has already been committed. This should never occur when >>>>>>>>>>>>>> using the at-least-once processing guarantee. >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>> >>>>>>>>>>>>>> What seems to be happening is this happens when boltB emits >>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails one of the tuples out of >>>>>>>>>>>>>> those 100 tuples, I am getting this error. Not able to >>>>>>>>>>>>>> understand how to >>>>>>>>>>>>>> fix this, ideally it should ack an original tuple when all >>>>>>>>>>>>>> 100 tuples are acked, but probably an original tuple is acked >>>>>>>>>>>>>> before all >>>>>>>>>>>>>> those 100 tuples are acked, which causes this error. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>>>>>> >>>>>>>> T: +44(0)870 600 2311 >>>>>>>> Connect with me: Email <[email protected]> >>>>>>>> >>>>>>>> >>>>>>>> [image: htk logo] <http://www.htk.co.uk/> >>>>>>>> >>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn >>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter >>>>>>>> <http://www.twitter.com/htkhorizon> >>>>>>>> >>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK. >>>>>>>> Company Registered in England and Wales as 3191677, VAT Number 675 >>>>>>>> 9467 71 >>>>>>>> >>>>>>>> >>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>>>>>> This email is only for the use of the intended recipients and may >>>>>>>> contain privileged information. If you’ve received this email in error, >>>>>>>> please let the sender know; then delete the message. The views >>>>>>>> expressed in this email represent those of the sender and not >>>>>>>> necessarily >>>>>>>> of HTK. >>>>>>>> >>>>>>>
