iswarezwp opened a new issue, #11392:
URL: https://github.com/apache/ignite/issues/11392
## issue
When I use IgniteDataStreamer with user defined class, like this:
```
cfg.setPeerClassLoadingEnabled(true);
Ignite ignite = Ignition.start(cfg);
IgniteDataStreamer<String, MyDouble> mktStmr =
ignite.dataStreamer(mktCache.getName());
```
I got the following error, it seems that the `MyDouble` class is not send to
remote node:
```
SEVERE: DataStreamer operation failed.
class org.apache.ignite.IgniteCheckedException: Failed to finish operation
(too many remaps): 32
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:977)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:942)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:474)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:350)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:338)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:586)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:565)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:553)
at
org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:132)
at
org.apache.ignite.internal.util.future.GridCompoundFuture.apply(GridCompoundFuture.java:46)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:474)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:350)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:338)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:586)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:565)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:2121)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$3.onMessage(DataStreamerImpl.java:368)
at
org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1906)
at
org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1527)
at
org.apache.ignite.internal.managers.communication.GridIoManager.access$5300(GridIoManager.java:242)
at
org.apache.ignite.internal.managers.communication.GridIoManager$9.execute(GridIoManager.java:1420)
at
org.apache.ignite.internal.managers.communication.TraceRunnable.run(TraceRunnable.java:55)
at
org.apache.ignite.internal.util.StripedExecutor$Stripe.body(StripedExecutor.java:637)
at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125)
at java.lang.Thread.run(Thread.java:750)
Caused by: class org.apache.ignite.IgniteCheckedException: DataStreamer
request failed [node=4167fc2a-b0c4-45a7-bf8e-52a97457ad34]
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:2112)
... 9 more
Caused by: class org.apache.ignite.binary.BinaryInvalidTypeException:
com.nimblex.StreamVisitorExample$MyDouble
at
org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:741)
at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1772)
at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1731)
at
org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:866)
at
org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:198)
at
org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:199)
at
org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:78)
at
org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:138)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry$1.getValue(DataStreamerEntry.java:96)
at
com.nimblex.StreamVisitorExample$DeviceStreamReceiver.receive(StreamVisitorExample.java:143)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:141)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:394)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:299)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
... 8 more
Caused by: java.lang.ClassNotFoundException:
com.nimblex.StreamVisitorExample$MyDouble
at java.net.URLClassLoader.findClass(URLClassLoader.java:407)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9373)
at
org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:9311)
at
org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:384)
at
org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:717)
... 22 more
```
## code
The code is modifed from
https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
```java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nimblex;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.stream.StreamVisitor;
public class StreamVisitorExample {
/** Random number generator. */
private static final Random RAND = new Random();
/** The list of instruments. */
private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT",
"GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};
/** The list of initial instrument prices. */
private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21,
23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
public static void main(String[] args) throws Exception {
TcpDiscoverySpi spi = new TcpDiscoverySpi();
// create a new instance of tcp discovery multicast ip finder
TcpDiscoveryMulticastIpFinder tcMp = new
TcpDiscoveryMulticastIpFinder();
tcMp.setAddresses(Arrays.asList("192.168.8.63")); // change your IP
address here
// set the multi cast ip finder for spi
spi.setIpFinder(tcMp);
// create new ignite configuration
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(true);
// set the discovery spi to ignite configuration
cfg.setDiscoverySpi(spi);
// fixbug: Remote node has deployment mode different from local
cfg.setPeerClassLoadingEnabled(true);
// cfg.setDeploymentMode(DeploymentMode.CONTINUOUS);
try (Ignite ignite = Ignition.start(cfg)) {
// Market data cache with default configuration.
CacheConfiguration<String, MyDouble> mktDataCfg = new
CacheConfiguration<>("marketTicks");
// Financial instrument cache configuration.
CacheConfiguration<String, Instrument> instCfg = new
CacheConfiguration<>("instCache");
// Index key and value for querying financial instruments.
// Note that Instrument class has @QuerySqlField annotation for
secondary field indexing.
instCfg.setIndexedTypes(String.class, Instrument.class);
// Auto-close caches at the end of the example.
try (
IgniteCache<String, MyDouble> mktCache =
ignite.getOrCreateCache(mktDataCfg);
IgniteCache<String, Instrument> instCache =
ignite.getOrCreateCache(instCfg)
) {
try (IgniteDataStreamer<String, MyDouble> mktStmr =
ignite.dataStreamer(mktCache.getName())) {
// Note that we receive market data, but do not populate
'mktCache' (it remains empty).
// Instead we update the instruments in the 'instCache'.
// Since both, 'instCache' and 'mktCache' use the same
key, updates are collocated.
System.out.println("register receiver: ");
mktStmr.receiver(new DeviceStreamReceiver(instCache));
// Stream 10 million market data ticks into the system.
System.out.println("begin write data: ");
for (int i = 1; i <= 1; i++) {
int idx = RAND.nextInt(INSTRUMENTS.length);
// Use gaussian distribution to ensure that
// numbers closer to 0 have higher probability.
double price = round2(INITIAL_PRICES[idx] +
RAND.nextGaussian());
mktStmr.addData(INSTRUMENTS[idx], new
MyDouble(price));
if (i % 500_000 == 0)
System.out.println("Number of tuples streamed
into Ignite: " + i);
}
}
System.out.println("try to query data: ");
// Select top 3 best performing instruments.
SqlFieldsQuery top3qry = new SqlFieldsQuery(
"select symbol, (latest - open) from Instrument order by
(latest - open) desc limit 3");
// Execute queries.
List<List<?>> top3 = instCache.query(top3qry).getAll();
System.out.println("Top performing financial instruments: "
+ top3.toString());
}
finally {
// Distributed cache could be removed from cluster only by
#destroyCache() call.
ignite.destroyCache(mktDataCfg.getName());
ignite.destroyCache(instCfg.getName());
}
}
}
public static class DeviceStreamReceiver implements
StreamReceiver<String, MyDouble> {
private IgniteCache<String, Instrument> instCache;
public DeviceStreamReceiver(IgniteCache<String, Instrument> cache) {
instCache = cache;
}
@Override
public void receive(IgniteCache<String, MyDouble> cache,
Collection<Map.Entry<String, MyDouble>> entries) {
//CommunicationTrackKafkaProducer producer =
CommunicationTrackKafkaProducer.getInstance();
for (Map.Entry<String, MyDouble> entry : entries) {
String symbol = entry.getKey();
MyDouble tick = entry.getValue();
Instrument inst = instCache.get(symbol);
if (inst == null)
inst = new Instrument(symbol);
// Don't populate market cache, as we don't use it for
querying.
// Update cached instrument based on the latest market tick.
inst.update(tick.getValue());
instCache.put(symbol, inst);
}
}
}
/**
* Rounds double value to two significant signs.
*
* @param val value to be rounded.
* @return rounded double value.
*/
private static double round2(double val) {
return Math.floor(100 * val + 0.5) / 100;
}
/**
* Financial instrument.
*/
public static class Instrument implements Serializable {
/** Instrument symbol. */
@QuerySqlField(index = true)
private final String symbol;
/** Open price. */
@QuerySqlField(index = true)
private double open;
/** Close price. */
@QuerySqlField(index = true)
private double latest;
/**
* @param symbol Symbol.
*/
public Instrument(String symbol) {
this.symbol = symbol;
}
/**
* Updates this instrument based on the latest market tick price.
*
* @param price Latest price.
*/
public void update(double price) {
if (open == 0)
open = price;
this.latest = price;
}
}
public static class MyDouble implements Serializable {
private Double value;
public MyDouble(Double value) {
this.value = value;
}
public Double getValue() {
return value;
}
public void setValue(Double value) {
this.value = value;
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]