Hi All, I can't configure Memcached to be used with trident. It will be a big help If you can tell me what I am doing wrong.
I want to run my topology in distributed mode, not in the local mode. Here is what I have done: 1. Installed `Memcached` using `sudo apt-get install memcached`. 2. Added `trident-memcached`[1] as a dependency in my `pom.xml` file. 3. Wrote a trident topology (with a DRPC stream). 4. Run zookeeper, nimbus, supervisor(in 3 nodes), drpc server and memcached server. 5. Submit the topology But when I submit my topology, I can see the following error in the worker nodes. 2015-08-05 09:17:30 b.s.util [ERROR] Async loop died! java.lang.AbstractMethodError: trident.memcached.MemcachedState$Factory.makeState(Ljava/util/Map;Lbacktype/storm/task/IMetricsContext;II)Lstorm/trident/state/State; at storm.trident.planner.SubtopologyBolt.prepare(SubtopologyBolt.java:69) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:231) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80] 2015-08-05 09:17:30 b.s.d.executor [ERROR] java.lang.AbstractMethodError: trident.memcached.MemcachedState$Factory.makeState(Ljava/util/Map;Lbacktype/storm/task/IMetricsContext;II)Lstorm/trident/state/State; at storm.trident.planner.SubtopologyBolt.prepare(SubtopologyBolt.java:69) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:231) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80] I have attached my code for your kind reference. Thanks in advance. Regards, Thilina
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package tutorial.storm.trident; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.Split; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.DRPCClient; import backtype.storm.utils.Utils; import trident.memcached.MemcachedState; import java.net.InetSocketAddress; import java.util.Arrays; public class Demo2 { public static void main(String[] args) throws Exception { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("apple apple"), new Values("ball ball"), new Values("cat cat"), new Values("dog dog")); spout.setCycle(true); Config conf = new Config(); conf.setDebug(false); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, basicStateAndDRPC(spout)); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, basicStateAndDRPC(spout)); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } DRPCClient client = new DRPCClient("localhost", 3772); while (true) { System.out.println(client.execute("words", "apple ball cat dog")); Thread.sleep(100000); } } private static StormTopology basicStateAndDRPC(FixedBatchSpout spout) { TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .parallelismHint(1) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.transactional(Arrays.asList(new InetSocketAddress("localhost", 11211))), new Count(), new Fields("count")) .parallelismHint(1); topology .newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()); return topology.build(); } }
