Hi All,

I can't configure Memcached to be used with trident. It will be a big help
If you
can tell me what I am doing wrong.

I want to run my topology in distributed mode, not in the local mode.

Here is what I have done:

1. Installed `Memcached` using `sudo apt-get install memcached`.
2. Added `trident-memcached`[1] as a dependency in my `pom.xml` file.
3. Wrote a trident topology (with a DRPC stream).
4. Run zookeeper, nimbus, supervisor(in 3 nodes), drpc server and memcached
server.
5. Submit the topology

But when I submit my topology, I can see the following error in the worker
nodes.

2015-08-05 09:17:30 b.s.util [ERROR] Async loop died!
java.lang.AbstractMethodError:
trident.memcached.MemcachedState$Factory.makeState(Ljava/util/Map;Lbacktype/storm/task/IMetricsContext;II)Lstorm/trident/state/State;
at storm.trident.planner.SubtopologyBolt.prepare(SubtopologyBolt.java:69)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:231)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]
2015-08-05 09:17:30 b.s.d.executor [ERROR]
java.lang.AbstractMethodError:
trident.memcached.MemcachedState$Factory.makeState(Ljava/util/Map;Lbacktype/storm/task/IMetricsContext;II)Lstorm/trident/state/State;
at storm.trident.planner.SubtopologyBolt.prepare(SubtopologyBolt.java:69)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:231)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]

I have attached my code for your kind reference.

Thanks in advance.

Regards,
Thilina
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package tutorial.storm.trident;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.Split;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.DRPCClient;
import backtype.storm.utils.Utils;
import trident.memcached.MemcachedState;

import java.net.InetSocketAddress;
import java.util.Arrays;

public class Demo2 {
    public static void main(String[] args) throws Exception {
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("apple apple"),
                new Values("ball ball"),
                new Values("cat cat"),
                new Values("dog dog"));
        spout.setCycle(true);

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, basicStateAndDRPC(spout));
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, basicStateAndDRPC(spout));
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }

        DRPCClient client = new DRPCClient("localhost", 3772);
        while (true) {
            System.out.println(client.execute("words", "apple ball cat dog"));
            Thread.sleep(100000);
        }
    }

    private static StormTopology basicStateAndDRPC(FixedBatchSpout spout) {
        TridentTopology topology = new TridentTopology();

        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .parallelismHint(1)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        .persistentAggregate(MemcachedState.transactional(Arrays.asList(new InetSocketAddress("localhost", 11211))), new Count(), new Fields("count"))
                        .parallelismHint(1);

        topology
                .newDRPCStream("words")
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull());

        return topology.build();
    }
}

Reply via email to