Hi Mimmo, You should deploy transfomer: stmr.deployClass(StreamTransformerExample.class) or should have the same code available among the cluster.
Thanks, Mikhail. 2017-06-27 20:55 GMT+03:00 Mimmo Celano <[email protected]>: > Hi, thanks to all > Sorry for mistake, in this WordCount the input was splitted in lines, so > every line call function Map. This is the problem, i change the code. When > I try to execute WordCount without stream receiver to update word in cache > all work fine, but when i add stream receiver i got this error. > > public class WordCountExample { > > public static class TokenizerMapper extends Mapper<Object, Text, Text, > IntWritable> { > // Making objects is expensive. Instantiate outside the loop and re-use > private final static IntWritable one = new IntWritable(1); > private Text word = new Text(); > Ignite ignite; > IgniteCache<String, Long> cache; > IgniteDataStreamer<String, Long> stmr; > > @Override protected void setup(Context context) throws IOException, > InterruptedException { > super.setup(context); > Ignition.setClientMode(true); > ignite = Ignition.start("/home/hduser/apache-ignite-2.0.0-src/ > examples/config/example-cache1.xml"); > CacheConfiguration<String,Long> cfg2= Ignition.loadSpringBean("/ > home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml", > "cacheconf"); > cache = ignite.getOrCreateCache(cfg2); > try{ > stmr = ignite.dataStreamer("default"); > }catch(Exception e){ > System.out.println("Error DataStream"); > } > stmr.allowOverwrite(true); > stmr.receiver(StreamTransformer.from((e, arg) -> { //Probably the > error > // Get current count. > Long val = e.getValue(); > > // Increment current count by 1. > e.setValue(val == null ? 1L : val + 1); > > return null; > })); > } > > > public void map(Object key, Text value, Context context) throws > IOException, InterruptedException { > StringTokenizer itr = new StringTokenizer(value.toString()); > > // Whilst iterating over the token iterator > while (itr.hasMoreTokens()) { > stmr.addData(itr.nextToken(), 1L); > > } > > } > > @Override protected void cleanup(Context context) throws IOException, > InterruptedException { > super.setup(context); > > ignite.close(); > } > > } > > > > public static class IntSumReducer extends > Reducer<Text,IntWritable,Text,IntWritable> > { > private IntWritable result = new IntWritable(); > > > > public void reduce(Text key, Iterable<IntWritable> values, Context > context) throws IOException, InterruptedException { > int sum = 0; > > for (IntWritable val : values) { > sum += val.get(); > } > > result.set(sum); > context.write(key, result); > } > > > } > > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > Job job = Job.getInstance(conf, "word count"); > > // Make this class the main in the JAR file > job.setJarByClass(WordCount.class); > > // Set out Mapper class, conforming to the API > job.setMapperClass(TokenizerMapper.class); > > // Set out Combiner & Reducer classes, conforming to the (same) API > job.setCombinerClass(IntSumReducer.class); > job.setReducerClass(IntSumReducer.class); > > // Set the ouput Key type > job.setOutputKeyClass(Text.class); > > // Set the output Value type > job.setOutputValueClass(IntWritable.class); > > // Set number of reducers > job.setNumReduceTasks(10); > > // Get the input and output paths from the job arguments > FileInputFormat.addInputPath(job, new Path(args[0])); > FileOutputFormat.setOutputPath(job, new Path(args[1])); > > System.exit(job.waitForCompletion(true) ? 0 : 1); > } > } > > > 17/06/27 17:52:17 ERROR datastreamer.DataStreamProcessor: Failed to > unmarshal message [nodeId=491340cd-aeb0-49ba-affc-36f188f3a8e5, > req=DataStreamerRequest [reqId=36, cacheName=default, > ignoreDepOwnership=true, skipStore=false, keepBinary=false, depMode=null, > sampleClsName=null, userVer=null, ldrParticipants=null, clsLdrId=null, > forceLocDep=true, topVer=AffinityTopologyVersion [topVer=9, minorTopVer=1]]] > class org.apache.ignite.IgniteCheckedException: Failed to unmarshal > object with optimized marshaller > at org.apache.ignite.internal.util.IgniteUtils.unmarshal( > IgniteUtils.java:9893) > at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor. > processRequest(DataStreamProcessor.java:288) > at org.apache.ignite.internal.processors.datastreamer. > DataStreamProcessor.access$000(DataStreamProcessor.java:58) > at org.apache.ignite.internal.processors.datastreamer. > DataStreamProcessor$1.onMessage(DataStreamProcessor.java:88) > at org.apache.ignite.internal.managers.communication. > GridIoManager.invokeListener(GridIoManager.java:1257) > at org.apache.ignite.internal.managers.communication.GridIoManager. > processRegularMessage0(GridIoManager.java:885) > at org.apache.ignite.internal.managers.communication. > GridIoManager.access$2100(GridIoManager.java:114) > at org.apache.ignite.internal.managers.communication.GridIoManager$7.run( > GridIoManager.java:802) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: class org.apache.ignite.binary.BinaryObjectException: Failed > to unmarshal object with optimized marshaller > at org.apache.ignite.internal.binary.BinaryUtils. > doReadOptimized(BinaryUtils.java:1715) > at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0( > BinaryReaderExImpl.java:1944) > at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize( > BinaryReaderExImpl.java:1704) > at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize( > GridBinaryMarshaller.java:304) > at org.apache.ignite.internal.binary.BinaryMarshaller. > unmarshal0(BinaryMarshaller.java:99) > at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal( > AbstractNodeNameAwareMarshaller.java:82) > at org.apache.ignite.internal.util.IgniteUtils.unmarshal( > IgniteUtils.java:9887) > ... 10 more > Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find > class with given class loader for unmarshalling (make sure same versions of > all classes are available on all nodes or enable peer-class-loading) > [clsLdr=sun.misc.Launcher$AppClassLoader@3941a79c, > cls=ignite.WordCountExample$TokenizerMapper] > at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller. > unmarshal0(OptimizedMarshaller.java:230) > at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal( > AbstractNodeNameAwareMarshaller.java:94) > at org.apache.ignite.internal.binary.BinaryUtils. > doReadOptimized(BinaryUtils.java:1712) > ... 16 more > Caused by: java.lang.ClassNotFoundException: ignite.WordCountExample$ > TokenizerMapper > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at org.apache.ignite.internal.util.IgniteUtils.forName( > IgniteUtils.java:8478) > at org.apache.ignite.internal.MarshallerContextImpl.getClass( > MarshallerContextImpl.java:340) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java: > 268) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readClass(OptimizedObjectInputStream.java:349) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream. > java:301) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream. > java:579) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream. > java:324) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream. > java:579) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927) > at org.apache.ignite.internal.marshaller.optimized. > OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream. > java:324) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367) > at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller. > unmarshal0(OptimizedMarshaller.java:227) > ... 18 more > > > Should I add something to the server? > > public class StartServer2{ > > public static void main(String args[]) throws Exception { > Ignition.setClientMode(false); > try(Ignite ignite = Ignition.start("/home/hduser/apache-ignite-2.0.0-src/ > examples/config/example-cache1.xml")){ > CacheConfiguration<String,Long> cfg2= Ignition.loadSpringBean("/ > home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml", > "cacheconf"); > IgniteCache<String, Long> cache; > cache = ignite.getOrCreateCache(cfg2); > while(true){ > } > } > } > > } > > > Thanks for support > Mimmo > > 2017-06-27 12:23 GMT+02:00 Mimmo Celano <[email protected]>: > >> Hi, >> Thanks for your reply. >> We are testing ignite performances on hadoop without hadoop-accelerator >> for eventually use it in our project. >> We have a file of 700mb wich is spitted in 6 map tasks, I think that >> setup time it's not expensive, it's 6-7 seconds. The cache.put which you >> have mentioned is not computing because it' a comment. >> Hadoop computation time is 1:50 min with 3 parallel tasks, 1:57 min with >> ignite setup and without caching operation, 24:25 min with ignite setup and >> one cache.put(interno) at the end of every map task, 33:35 min with ignite >> setup and datastream for word caching. I don't think that these cache.put >> or datastream are so expensive, it's slower than we were thinking. >> The Nodes are connected each other within a 10Gbps lan, this may be a >> Bottleneck communication? >> Could we improve computation time with a server for each node and >> affinity collocation to write every word in local memory without tcp >> connection? Can we eventually use cache.get and tcp communication to get >> information of an inserted word? >> >> Thanks >> >> 2017-06-27 10:43 GMT+02:00 Michael Cherkasov <[email protected] >> >: >> >>> Hi Mimmo, >>> >>> How many map tasks do you have? if you have a lot of map tasks with >>> small amount of work you will spend almost all cpu time in setup method. >>> >>> Also if you have small amount of data, one network operation( >>> cache.put("interno", 666); ) can be very expensive operation relative to >>> the whole map task. >>> >>> I don't understand what you try to achieve, I think you have some >>> misunderstanding how to use ignite with hadoop. >>> >>> Please, read this doc: https://apacheignite-fs.readme >>> .io/docs/hadoop-accelerator >>> it explains how integrate ignite and hadoop together. >>> >>> You can use ignite's IGFS and use hadoop as secondary file system and >>> ignites implementation of job tracker that should improve performance in >>> your case. >>> >>> Thanks, >>> Mikhail. >>> >>> 2017-06-26 21:00 GMT+03:00 mimmo_c <[email protected]>: >>> >>>> Hi, >>>> ìThanks to your suggestions i managed to configure good ignite. All >>>> Work but >>>> I found another issue... The computation is 20 or 30 times slower than >>>> the >>>> same computation without put word in cache. If i put just 1 word in >>>> cache at >>>> the beginning of map function the computation time it's the same. From >>>> what >>>> can depend? >>>> This is the Server Code >>>> >>>> public static void main(String args[]) throws Exception { >>>> Ignition.setClientMode(false); >>>> >>>> try(Ignite ignite = >>>> Ignition.start("/home/hduser/apache-ignite-2.0.0-src/example >>>> s/config/example-cache1.xml")){ >>>> >>>> //CacheConfiguration<String ,Integer> cfg2 = >>>> new CacheConfiguration<>(); >>>> CacheConfiguration<String,Integer> cfg2= >>>> Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-sr >>>> c/examples/config/example-cache1.xml", >>>> "cacheconf"); >>>> >>>> >>>> IgniteCache<String, Integer> cache; >>>> cache = ignite.getOrCreateCache(cfg2); >>>> //cache.put("uno", 1); >>>> //cache.put("due", 2); >>>> >>>> //System.out.println(cache.get("uno")); >>>> //System.out.println(cache.get("due")); >>>> >>>> while(true){ >>>> >>>> } >>>> } >>>> >>>> >>>> >>>> >>>> } >>>> >>>> This is the WordCount code >>>> >>>> public class WordCountIgnite extends Configured implements Tool { >>>> >>>> >>>> >>>> public static void main(String args[]) throws Exception { >>>> >>>> >>>> int res = ToolRunner.run(new WordCountIgnite(), args); >>>> System.exit(res); >>>> >>>> >>>> } >>>> >>>> public int run(String[] args) throws Exception { >>>> >>>> Path inputPath = new Path(args[0]); >>>> Path outputPath = new Path(args[1]); >>>> >>>> Configuration conf = getConf(); >>>> Job job = Job.getInstance(conf, "word count"); >>>> >>>> FileInputFormat.setInputPaths(job, inputPath); >>>> FileOutputFormat.setOutputPath(job, outputPath); >>>> >>>> job.setJarByClass(this.getClass()); >>>> job.setInputFormatClass(TextInputFormat.class); >>>> job.setOutputFormatClass(TextOutputFormat.class); >>>> job.setMapOutputKeyClass(Text.class); >>>> job.setMapOutputValueClass(IntWritable.class); >>>> job.setOutputKeyClass(Text.class); >>>> job.setOutputValueClass(IntWritable.class); >>>> >>>> job.setMapperClass(Map.class); >>>> job.setCombinerClass(Reduce.class); >>>> job.setReducerClass(Reduce.class); >>>> >>>> return job.waitForCompletion(true) ? 0 : 1; >>>> } >>>> >>>> public static class Map extends Mapper<LongWritable, Text, Text, >>>> IntWritable> { >>>> >>>> private final static IntWritable one = new >>>> IntWritable(1); >>>> private Text word = new Text(); >>>> Ignite ignite; >>>> IgniteCache<String, Integer> cache; >>>> >>>> @Override protected void setup(Context context) throws >>>> IOException, >>>> InterruptedException { >>>> super.setup(context); >>>> >>>> ignite = >>>> Ignition.start("/home/hduser/apache-ignite-2.0.0-src/example >>>> s/config/example-cache2.xml"); >>>> >>>> //CacheConfiguration<String ,Integer> cfg2 = new >>>> CacheConfiguration<>(); >>>> CacheConfiguration<String,Integer> cfg2= >>>> Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-sr >>>> c/examples/config/example-cache1.xml", >>>> "cacheconf"); >>>> >>>> >>>> cache = ignite.getOrCreateCache(cfg2); >>>> //cache.put("test", 1993); >>>> >>>> } >>>> >>>> @Override >>>> public void map(LongWritable key, Text value, Context >>>> context) throws >>>> IOException, InterruptedException { >>>> // String line = value.toString(); >>>> // StringTokenizer tokenizer = new >>>> StringTokenizer(line); >>>> // while (tokenizer.hasMoreTokens()) { >>>> // word.set(tokenizer.nextToken()); >>>> // context.write(word, one); >>>> // } >>>> >>>> >>>> String[] lines = tokenize(value.toString()); >>>> >>>> try (IgniteDataStreamer<String, Integer> stmr = >>>> ignite.dataStreamer("Cache")) { >>>> // Stream entries. >>>> for(String token : lines){ >>>> //word.set(token); >>>> //context.write(word, one); >>>> stmr.addData(token, 1); >>>> } >>>> } >>>> >>>> >>>> >>>> //cache.put("interno", 666); >>>> } >>>> >>>> @Override protected void cleanup(Context context) >>>> throws IOException, >>>> InterruptedException { >>>> super.setup(context); >>>> >>>> ignite.close(); >>>> >>>> } >>>> >>>> private String[] tokenize(String text) { >>>> text = text.toLowerCase(); >>>> text = text.replace("'",""); >>>> text = text.replaceAll("[\\s\\W]+", " >>>> ").trim(); >>>> return text.split(" "); >>>> } >>>> } >>>> >>>> public static class Reduce extends Reducer<Text, IntWritable, >>>> Text, >>>> IntWritable> { >>>> >>>> @Override >>>> public void reduce(Text key, Iterable<IntWritable> >>>> values, Context >>>> context) throws IOException, InterruptedException { >>>> int sum = 0; >>>> for (IntWritable value : values) { >>>> sum += value.get(); >>>> } >>>> >>>> context.write(key, new IntWritable(sum)); >>>> } >>>> >>>> } >>>> >>>> >>>> } >>>> >>>> This is the Ignite Configuration >>>> >>>> <beans xmlns="http://www.springframework.org/schema/beans" >>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>> xsi:schemaLocation=" >>>> http://www.springframework.org/schema/beans >>>> http://www.springframework.org/schema/beans/spring-beans.xsd"> >>>> <bean id="ignite.cfg" >>>> class="org.apache.ignite.configuration.IgniteConfiguration"> >>>> >>>> >>>> <property name="discoverySpi"> >>>> <bean >>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> >>>> <property name="ipFinder"> >>>> >>>> >>>> >>>> <bean >>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicas >>>> t.TcpDiscoveryMulticastIpFinder"> >>>> <property name="addresses"> >>>> <list> >>>> >>>> <value>127.0.0.1:47500..47509</value> >>>> <value>192.168.30.5:47500..475 >>>> 09</value> >>>> <value>192.168.30.22:47500..47 >>>> 509</value> >>>> <value>192.168.30.99:47500..47 >>>> 509</value> >>>> </list> >>>> </property> >>>> </bean> >>>> </property> >>>> </bean> >>>> </property> >>>> </bean> >>>> >>>> <bean id="cacheconf" >>>> class="org.apache.ignite.configuration.CacheConfiguration"> >>>> <property name="name" value="default"/> >>>> <property name="atomicityMode" value="ATOMIC"/> >>>> <property name="backups" value="0"/> >>>> <property name="cacheMode" value="PARTITIONED"/> >>>> >>>> </bean> >>>> </beans> >>>> >>>> >>>> According to Ignite Documentation >>>> https://apacheignite.readme.io/docs/performance-tips I think maybe is >>>> a >>>> problem of cache start dimension but i can't set it. I'm using Ignite >>>> 2.0. >>>> When I add <property name="startSize" value="10"/> in configuration it >>>> launch exception >>>> >>>> Caused by: org.springframework.beans.NotWritablePropertyException: >>>> Invalid >>>> property 'startSize' of bean class >>>> [org.apache.ignite.configuration.CacheConfiguration]: Bean property >>>> 'startSize' is not writable or has an invalid setter method. Does the >>>> parameter type of the setter match the return type of the getter? >>>> >>>> I don't know why.... I tried with cache.put() method but it is slower. >>>> Are >>>> they normale this computation times? >>>> >>>> I think the times should be similar to those of a normal hadoop >>>> computation. >>>> The test are made by 2 slave who are Ignite server Node too, in Hadoop >>>> computation create 2 Client node in there same slave who put data in >>>> cache. >>>> >>>> Thanks >>>> Mimmo >>>> >>>> >>>> >>>> -- >>>> View this message in context: http://apache-ignite-users.705 >>>> 18.x6.nabble.com/Performance-WordCount-to-Hadoop-tp14084.html >>>> Sent from the Apache Ignite Users mailing list archive at Nabble.com. >>>> >>> >>> >> >
