Hi, thanks to all
Sorry for mistake, in this WordCount the input was splitted in lines, so
every line call function Map. This is the problem, i change the code. When
I try to execute WordCount without stream receiver to update word in cache
all work fine, but when i add stream receiver i got this error.
public class WordCountExample {
public static class TokenizerMapper extends Mapper<Object, Text, Text,
IntWritable> {
// Making objects is expensive. Instantiate outside the loop and re-use
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
Ignite ignite;
IgniteCache<String, Long> cache;
IgniteDataStreamer<String, Long> stmr;
@Override protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Ignition.setClientMode(true);
ignite =
Ignition.start("/home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml");
CacheConfiguration<String,Long> cfg2=
Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml",
"cacheconf");
cache = ignite.getOrCreateCache(cfg2);
try{
stmr = ignite.dataStreamer("default");
}catch(Exception e){
System.out.println("Error DataStream");
}
stmr.allowOverwrite(true);
stmr.receiver(StreamTransformer.from((e, arg) -> { //Probably the error
// Get current count.
Long val = e.getValue();
// Increment current count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}));
}
public void map(Object key, Text value, Context context) throws
IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
// Whilst iterating over the token iterator
while (itr.hasMoreTokens()) {
stmr.addData(itr.nextToken(), 1L);
}
}
@Override protected void cleanup(Context context) throws IOException,
InterruptedException {
super.setup(context);
ignite.close();
}
}
public static class IntSumReducer extends
Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
// Make this class the main in the JAR file
job.setJarByClass(WordCount.class);
// Set out Mapper class, conforming to the API
job.setMapperClass(TokenizerMapper.class);
// Set out Combiner & Reducer classes, conforming to the (same) API
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// Set the ouput Key type
job.setOutputKeyClass(Text.class);
// Set the output Value type
job.setOutputValueClass(IntWritable.class);
// Set number of reducers
job.setNumReduceTasks(10);
// Get the input and output paths from the job arguments
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
17/06/27 17:52:17 ERROR datastreamer.DataStreamProcessor: Failed to
unmarshal message [nodeId=491340cd-aeb0-49ba-affc-36f188f3a8e5,
req=DataStreamerRequest [reqId=36, cacheName=default,
ignoreDepOwnership=true, skipStore=false, keepBinary=false, depMode=null,
sampleClsName=null, userVer=null, ldrParticipants=null, clsLdrId=null,
forceLocDep=true, topVer=AffinityTopologyVersion [topVer=9, minorTopVer=1]]]
class org.apache.ignite.IgniteCheckedException: Failed to unmarshal object
with optimized marshaller
at
org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9893)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:288)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:58)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:88)
at
org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257)
at
org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885)
at
org.apache.ignite.internal.managers.communication.GridIoManager.access$2100(GridIoManager.java:114)
at
org.apache.ignite.internal.managers.communication.GridIoManager$7.run(GridIoManager.java:802)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.binary.BinaryObjectException: Failed to
unmarshal object with optimized marshaller
at
org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1715)
at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1944)
at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1704)
at
org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:304)
at
org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal0(BinaryMarshaller.java:99)
at
org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:82)
at
org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9887)
... 10 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find
class with given class loader for unmarshalling (make sure same versions of
all classes are available on all nodes or enable peer-class-loading)
[clsLdr=sun.misc.Launcher$AppClassLoader@3941a79c,
cls=ignite.WordCountExample$TokenizerMapper]
at
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:230)
at
org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:94)
at
org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1712)
... 16 more
Caused by: java.lang.ClassNotFoundException:
ignite.WordCountExample$TokenizerMapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8478)
at
org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:340)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:268)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readClass(OptimizedObjectInputStream.java:349)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:301)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:227)
... 18 more
Should I add something to the server?
public class StartServer2{
public static void main(String args[]) throws Exception {
Ignition.setClientMode(false);
try(Ignite ignite =
Ignition.start("/home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml")){
CacheConfiguration<String,Long> cfg2=
Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml",
"cacheconf");
IgniteCache<String, Long> cache;
cache = ignite.getOrCreateCache(cfg2);
while(true){
}
}
}
}
Thanks for support
Mimmo
2017-06-27 12:23 GMT+02:00 Mimmo Celano <[email protected]>:
> Hi,
> Thanks for your reply.
> We are testing ignite performances on hadoop without hadoop-accelerator
> for eventually use it in our project.
> We have a file of 700mb wich is spitted in 6 map tasks, I think that setup
> time it's not expensive, it's 6-7 seconds. The cache.put which you have
> mentioned is not computing because it' a comment.
> Hadoop computation time is 1:50 min with 3 parallel tasks, 1:57 min with
> ignite setup and without caching operation, 24:25 min with ignite setup and
> one cache.put(interno) at the end of every map task, 33:35 min with ignite
> setup and datastream for word caching. I don't think that these cache.put
> or datastream are so expensive, it's slower than we were thinking.
> The Nodes are connected each other within a 10Gbps lan, this may be a
> Bottleneck communication?
> Could we improve computation time with a server for each node and affinity
> collocation to write every word in local memory without tcp connection? Can
> we eventually use cache.get and tcp communication to get information of an
> inserted word?
>
> Thanks
>
> 2017-06-27 10:43 GMT+02:00 Michael Cherkasov <[email protected]>
> :
>
>> Hi Mimmo,
>>
>> How many map tasks do you have? if you have a lot of map tasks with small
>> amount of work you will spend almost all cpu time in setup method.
>>
>> Also if you have small amount of data, one network operation(
>> cache.put("interno", 666); ) can be very expensive operation relative to
>> the whole map task.
>>
>> I don't understand what you try to achieve, I think you have some
>> misunderstanding how to use ignite with hadoop.
>>
>> Please, read this doc: https://apacheignite-fs.readme
>> .io/docs/hadoop-accelerator
>> it explains how integrate ignite and hadoop together.
>>
>> You can use ignite's IGFS and use hadoop as secondary file system and
>> ignites implementation of job tracker that should improve performance in
>> your case.
>>
>> Thanks,
>> Mikhail.
>>
>> 2017-06-26 21:00 GMT+03:00 mimmo_c <[email protected]>:
>>
>>> Hi,
>>> ìThanks to your suggestions i managed to configure good ignite. All Work
>>> but
>>> I found another issue... The computation is 20 or 30 times slower than
>>> the
>>> same computation without put word in cache. If i put just 1 word in
>>> cache at
>>> the beginning of map function the computation time it's the same. From
>>> what
>>> can depend?
>>> This is the Server Code
>>>
>>> public static void main(String args[]) throws Exception {
>>> Ignition.setClientMode(false);
>>>
>>> try(Ignite ignite =
>>> Ignition.start("/home/hduser/apache-ignite-2.0.0-src/example
>>> s/config/example-cache1.xml")){
>>>
>>> //CacheConfiguration<String ,Integer> cfg2 = new
>>> CacheConfiguration<>();
>>> CacheConfiguration<String,Integer> cfg2=
>>> Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-sr
>>> c/examples/config/example-cache1.xml",
>>> "cacheconf");
>>>
>>>
>>> IgniteCache<String, Integer> cache;
>>> cache = ignite.getOrCreateCache(cfg2);
>>> //cache.put("uno", 1);
>>> //cache.put("due", 2);
>>>
>>> //System.out.println(cache.get("uno"));
>>> //System.out.println(cache.get("due"));
>>>
>>> while(true){
>>>
>>> }
>>> }
>>>
>>>
>>>
>>>
>>> }
>>>
>>> This is the WordCount code
>>>
>>> public class WordCountIgnite extends Configured implements Tool {
>>>
>>>
>>>
>>> public static void main(String args[]) throws Exception {
>>>
>>>
>>> int res = ToolRunner.run(new WordCountIgnite(), args);
>>> System.exit(res);
>>>
>>>
>>> }
>>>
>>> public int run(String[] args) throws Exception {
>>>
>>> Path inputPath = new Path(args[0]);
>>> Path outputPath = new Path(args[1]);
>>>
>>> Configuration conf = getConf();
>>> Job job = Job.getInstance(conf, "word count");
>>>
>>> FileInputFormat.setInputPaths(job, inputPath);
>>> FileOutputFormat.setOutputPath(job, outputPath);
>>>
>>> job.setJarByClass(this.getClass());
>>> job.setInputFormatClass(TextInputFormat.class);
>>> job.setOutputFormatClass(TextOutputFormat.class);
>>> job.setMapOutputKeyClass(Text.class);
>>> job.setMapOutputValueClass(IntWritable.class);
>>> job.setOutputKeyClass(Text.class);
>>> job.setOutputValueClass(IntWritable.class);
>>>
>>> job.setMapperClass(Map.class);
>>> job.setCombinerClass(Reduce.class);
>>> job.setReducerClass(Reduce.class);
>>>
>>> return job.waitForCompletion(true) ? 0 : 1;
>>> }
>>>
>>> public static class Map extends Mapper<LongWritable, Text, Text,
>>> IntWritable> {
>>>
>>> private final static IntWritable one = new
>>> IntWritable(1);
>>> private Text word = new Text();
>>> Ignite ignite;
>>> IgniteCache<String, Integer> cache;
>>>
>>> @Override protected void setup(Context context) throws
>>> IOException,
>>> InterruptedException {
>>> super.setup(context);
>>>
>>> ignite =
>>> Ignition.start("/home/hduser/apache-ignite-2.0.0-src/example
>>> s/config/example-cache2.xml");
>>>
>>> //CacheConfiguration<String ,Integer> cfg2 = new
>>> CacheConfiguration<>();
>>> CacheConfiguration<String,Integer> cfg2=
>>> Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-sr
>>> c/examples/config/example-cache1.xml",
>>> "cacheconf");
>>>
>>>
>>> cache = ignite.getOrCreateCache(cfg2);
>>> //cache.put("test", 1993);
>>>
>>> }
>>>
>>> @Override
>>> public void map(LongWritable key, Text value, Context
>>> context) throws
>>> IOException, InterruptedException {
>>> // String line = value.toString();
>>> // StringTokenizer tokenizer = new
>>> StringTokenizer(line);
>>> // while (tokenizer.hasMoreTokens()) {
>>> // word.set(tokenizer.nextToken());
>>> // context.write(word, one);
>>> // }
>>>
>>>
>>> String[] lines = tokenize(value.toString());
>>>
>>> try (IgniteDataStreamer<String, Integer> stmr =
>>> ignite.dataStreamer("Cache")) {
>>> // Stream entries.
>>> for(String token : lines){
>>> //word.set(token);
>>> //context.write(word, one);
>>> stmr.addData(token, 1);
>>> }
>>> }
>>>
>>>
>>>
>>> //cache.put("interno", 666);
>>> }
>>>
>>> @Override protected void cleanup(Context context) throws
>>> IOException,
>>> InterruptedException {
>>> super.setup(context);
>>>
>>> ignite.close();
>>>
>>> }
>>>
>>> private String[] tokenize(String text) {
>>> text = text.toLowerCase();
>>> text = text.replace("'","");
>>> text = text.replaceAll("[\\s\\W]+", "
>>> ").trim();
>>> return text.split(" ");
>>> }
>>> }
>>>
>>> public static class Reduce extends Reducer<Text, IntWritable,
>>> Text,
>>> IntWritable> {
>>>
>>> @Override
>>> public void reduce(Text key, Iterable<IntWritable>
>>> values, Context
>>> context) throws IOException, InterruptedException {
>>> int sum = 0;
>>> for (IntWritable value : values) {
>>> sum += value.get();
>>> }
>>>
>>> context.write(key, new IntWritable(sum));
>>> }
>>>
>>> }
>>>
>>>
>>> }
>>>
>>> This is the Ignite Configuration
>>>
>>> <beans xmlns="http://www.springframework.org/schema/beans"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="
>>> http://www.springframework.org/schema/beans
>>> http://www.springframework.org/schema/beans/spring-beans.xsd">
>>> <bean id="ignite.cfg"
>>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>>
>>>
>>> <property name="discoverySpi">
>>> <bean
>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>> <property name="ipFinder">
>>>
>>>
>>>
>>> <bean
>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicas
>>> t.TcpDiscoveryMulticastIpFinder">
>>> <property name="addresses">
>>> <list>
>>>
>>> <value>127.0.0.1:47500..47509</value>
>>> <value>192.168.30.5:47500..47509</value>
>>> <value>192.168.30.22:47500..47
>>> 509</value>
>>> <value>192.168.30.99:47500..47
>>> 509</value>
>>> </list>
>>> </property>
>>> </bean>
>>> </property>
>>> </bean>
>>> </property>
>>> </bean>
>>>
>>> <bean id="cacheconf"
>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>> <property name="name" value="default"/>
>>> <property name="atomicityMode" value="ATOMIC"/>
>>> <property name="backups" value="0"/>
>>> <property name="cacheMode" value="PARTITIONED"/>
>>>
>>> </bean>
>>> </beans>
>>>
>>>
>>> According to Ignite Documentation
>>> https://apacheignite.readme.io/docs/performance-tips I think maybe is a
>>> problem of cache start dimension but i can't set it. I'm using Ignite
>>> 2.0.
>>> When I add <property name="startSize" value="10"/> in configuration it
>>> launch exception
>>>
>>> Caused by: org.springframework.beans.NotWritablePropertyException:
>>> Invalid
>>> property 'startSize' of bean class
>>> [org.apache.ignite.configuration.CacheConfiguration]: Bean property
>>> 'startSize' is not writable or has an invalid setter method. Does the
>>> parameter type of the setter match the return type of the getter?
>>>
>>> I don't know why.... I tried with cache.put() method but it is slower.
>>> Are
>>> they normale this computation times?
>>>
>>> I think the times should be similar to those of a normal hadoop
>>> computation.
>>> The test are made by 2 slave who are Ignite server Node too, in Hadoop
>>> computation create 2 Client node in there same slave who put data in
>>> cache.
>>>
>>> Thanks
>>> Mimmo
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-ignite-users.705
>>> 18.x6.nabble.com/Performance-WordCount-to-Hadoop-tp14084.html
>>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>>
>>
>>
>