iswarezwp opened a new issue, #11392:
URL: https://github.com/apache/ignite/issues/11392

   ## issue
   
   When I use IgniteDataStreamer with user defined class, like this:
   ```
   cfg.setPeerClassLoadingEnabled(true);
   Ignite ignite = Ignition.start(cfg);
   
   IgniteDataStreamer<String, MyDouble> mktStmr = 
ignite.dataStreamer(mktCache.getName());
   ```
   
   I got the following error, it seems that the `MyDouble` class is not send to 
remote node:
   ```
   SEVERE: DataStreamer operation failed.
   class org.apache.ignite.IgniteCheckedException: Failed to finish operation 
(too many remaps): 32
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:977)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:942)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:474)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:350)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:338)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:586)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:565)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:553)
           at 
org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:132)
           at 
org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:46)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:474)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:350)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:338)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:586)
           at 
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:565)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:2121)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$3.onMessage(DataStreamerImpl.java:368)
           at 
org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1906)
           at 
org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1527)
           at 
org.apache.ignite.internal.managers.communication.GridIoManager.access$5300(GridIoManager.java:242)
           at 
org.apache.ignite.internal.managers.communication.GridIoManager$9.execute(GridIoManager.java:1420)
           at 
org.apache.ignite.internal.managers.communication.TraceRunnable.run(TraceRunnable.java:55)
           at 
org.apache.ignite.internal.util.StripedExecutor$Stripe.body(StripedExecutor.java:637)
           at 
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125)
           at java.lang.Thread.run(Thread.java:750)
   Caused by: class org.apache.ignite.IgniteCheckedException: DataStreamer 
request failed [node=4167fc2a-b0c4-45a7-bf8e-52a97457ad34]
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:2112)
           ... 9 more
   Caused by: class org.apache.ignite.binary.BinaryInvalidTypeException: 
com.nimblex.StreamVisitorExample$MyDouble
           at 
org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:741)
           at 
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1772)
           at 
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1731)
           at 
org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:866)
           at 
org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:198)
           at 
org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:199)
           at 
org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:78)
           at 
org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:138)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry$1.getValue(DataStreamerEntry.java:96)
           at 
com.nimblex.StreamVisitorExample$DeviceStreamReceiver.receive(StreamVisitorExample.java:143)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:141)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:394)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:299)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
           at 
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
           ... 8 more
   Caused by: java.lang.ClassNotFoundException: 
com.nimblex.StreamVisitorExample$MyDouble
           at java.net.URLClassLoader.findClass(URLClassLoader.java:407)
           at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
           at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
           at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
           at java.lang.Class.forName0(Native Method)
           at java.lang.Class.forName(Class.java:348)
           at 
org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9373)
           at 
org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9311)
           at 
org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:384)
           at 
org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:717)
           ... 22 more
   ```
   
   
   ## code
   The code is modifed from 
https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
   
   ```java
   /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package com.nimblex;
   
   import java.io.Serializable;
   import java.util.Arrays;
   import java.util.Collection;
   import java.util.List;
   import java.util.Map;
   import java.util.Random;
   import java.util.concurrent.atomic.AtomicLong;
   
   import org.apache.ignite.Ignite;
   import org.apache.ignite.IgniteCache;
   import org.apache.ignite.IgniteDataStreamer;
   import org.apache.ignite.Ignition;
   import org.apache.ignite.cache.query.SqlFieldsQuery;
   import org.apache.ignite.cache.query.annotations.QuerySqlField;
   import org.apache.ignite.configuration.CacheConfiguration;
   import org.apache.ignite.configuration.DeploymentMode;
   import org.apache.ignite.configuration.IgniteConfiguration;
   import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
   import 
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
   import org.apache.ignite.stream.StreamReceiver;
   import org.apache.ignite.stream.StreamVisitor;
   
   
   public class StreamVisitorExample {
       /** Random number generator. */
       private static final Random RAND = new Random();
   
       /** The list of instruments. */
       private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", 
"GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};
   
       /** The list of initial instrument prices. */
       private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 
23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
   
       public static void main(String[] args) throws Exception {
   
           TcpDiscoverySpi spi = new TcpDiscoverySpi();
           // create a new instance of tcp discovery multicast ip finder
           TcpDiscoveryMulticastIpFinder tcMp = new 
TcpDiscoveryMulticastIpFinder();
           tcMp.setAddresses(Arrays.asList("192.168.8.63")); // change your IP 
address here
           // set the multi cast ip finder for spi
           spi.setIpFinder(tcMp);
           // create new ignite configuration
           IgniteConfiguration cfg = new IgniteConfiguration();
           cfg.setClientMode(true);
           // set the discovery spi to ignite configuration
           cfg.setDiscoverySpi(spi);
   
           // fixbug: Remote node has deployment mode different from local
           cfg.setPeerClassLoadingEnabled(true);
           // cfg.setDeploymentMode(DeploymentMode.CONTINUOUS);
   
           try (Ignite ignite = Ignition.start(cfg)) {
   
               // Market data cache with default configuration.
               CacheConfiguration<String, MyDouble> mktDataCfg = new 
CacheConfiguration<>("marketTicks");
   
               // Financial instrument cache configuration.
               CacheConfiguration<String, Instrument> instCfg = new 
CacheConfiguration<>("instCache");
   
               // Index key and value for querying financial instruments.
               // Note that Instrument class has @QuerySqlField annotation for 
secondary field indexing.
               instCfg.setIndexedTypes(String.class, Instrument.class);
   
               // Auto-close caches at the end of the example.
               try (
                   IgniteCache<String, MyDouble> mktCache = 
ignite.getOrCreateCache(mktDataCfg);
                   IgniteCache<String, Instrument> instCache = 
ignite.getOrCreateCache(instCfg)
               ) {
                   try (IgniteDataStreamer<String, MyDouble> mktStmr = 
ignite.dataStreamer(mktCache.getName())) {
                       // Note that we receive market data, but do not populate 
'mktCache' (it remains empty).
                       // Instead we update the instruments in the 'instCache'.
                       // Since both, 'instCache' and 'mktCache' use the same 
key, updates are collocated.
                       System.out.println("register receiver: ");
   
                   mktStmr.receiver(new DeviceStreamReceiver(instCache));
   
                       // Stream 10 million market data ticks into the system.
                       System.out.println("begin write data: ");
                       for (int i = 1; i <= 1; i++) {
                           int idx = RAND.nextInt(INSTRUMENTS.length);
   
                           // Use gaussian distribution to ensure that
                           // numbers closer to 0 have higher probability.
                           double price = round2(INITIAL_PRICES[idx] + 
RAND.nextGaussian());
   
                           mktStmr.addData(INSTRUMENTS[idx], new 
MyDouble(price));
   
                           if (i % 500_000 == 0)
                               System.out.println("Number of tuples streamed 
into Ignite: " + i);
                       }
                   }
   
                   System.out.println("try to query data: ");
   
                   // Select top 3 best performing instruments.
                   SqlFieldsQuery top3qry = new SqlFieldsQuery(
                       "select symbol, (latest - open) from Instrument order by 
(latest - open) desc limit 3");
   
                   // Execute queries.
                   List<List<?>> top3 = instCache.query(top3qry).getAll();
   
                   System.out.println("Top performing financial instruments: " 
+ top3.toString());
               }
               finally {
                   // Distributed cache could be removed from cluster only by 
#destroyCache() call.
                   ignite.destroyCache(mktDataCfg.getName());
                   ignite.destroyCache(instCfg.getName());
               }
           }
       }
   
       public static class DeviceStreamReceiver implements 
StreamReceiver<String, MyDouble> {
           private IgniteCache<String, Instrument> instCache;
           public DeviceStreamReceiver(IgniteCache<String, Instrument> cache) {
               instCache = cache;
           }
   
           @Override
           public void receive(IgniteCache<String, MyDouble> cache, 
Collection<Map.Entry<String, MyDouble>> entries) {
               //CommunicationTrackKafkaProducer producer = 
CommunicationTrackKafkaProducer.getInstance();
   
               for (Map.Entry<String, MyDouble> entry : entries) {
                   String symbol = entry.getKey();
                   MyDouble tick = entry.getValue();
   
                   Instrument inst = instCache.get(symbol);
   
                   if (inst == null)
                       inst = new Instrument(symbol);
   
                   // Don't populate market cache, as we don't use it for 
querying.
                   // Update cached instrument based on the latest market tick.
                   inst.update(tick.getValue());
   
                   instCache.put(symbol, inst);
               }
           }
       }
   
       /**
        * Rounds double value to two significant signs.
       *
       * @param val value to be rounded.
       * @return rounded double value.
       */
       private static double round2(double val) {
           return Math.floor(100 * val + 0.5) / 100;
       }
   
       /**
        * Financial instrument.
       */
       public static class Instrument implements Serializable {
           /** Instrument symbol. */
           @QuerySqlField(index = true)
           private final String symbol;
   
           /** Open price. */
           @QuerySqlField(index = true)
           private double open;
   
           /** Close price. */
           @QuerySqlField(index = true)
           private double latest;
   
           /**
            * @param symbol Symbol.
           */
           public Instrument(String symbol) {
               this.symbol = symbol;
           }
   
           /**
            * Updates this instrument based on the latest market tick price.
           *
           * @param price Latest price.
           */
           public void update(double price) {
               if (open == 0)
                   open = price;
   
               this.latest = price;
           }
       }
   
       public static class MyDouble implements Serializable {
           private Double value;
   
           public MyDouble(Double value) {
               this.value = value;
           }
   
           public Double getValue() {
               return value;
           }
   
           public void setValue(Double value) {
               this.value = value;
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to