Now i disabled node Filtering. You mean i started KafkaStreamer in
init()?
I started KafkaStreamer like:
@Override
public void execute(ServiceContext ctx) throws Exception {
log.info("KafkaStreamerService starting ...");
kafkaStreamer.start();
log.info("KafkaStreamerService started OK");
In init() i only configure KafkaStreamer parameters.
@Override
public void init(ServiceContext ctx) throws Exception {
log = ignite.log();
IgniteDataStreamer<String, String> stmr =
ignite.dataStreamer("kafkaCache");
stmr.allowOverwrite(true);
stmr.autoFlushFrequency(1000);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
kafkaStreamer.setThreads(4);
...
If i have to avoid fat instances, how i can share kafkaStreamer instance
between init(), execute() and cancel()?
чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[email protected]>:
> Hello!
>
> You should probably start KafkaStreamer on remote node when the service is
> initialized (init()), instead of starting it in e.g. constructor and trying
> to send it to remote node.
>
> Avoid putting fat instances in the fields of service/compute/predicate
> classes.
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[email protected]>:
>
>> I have 1 DataNod and 1 Service with streaming. I have a filter for
>> service:
>>
>> <property name="nodeFilter">
>> <bean class="common.filters.KafkaStreamerServiceFilter"/>
>> </property>
>>
>> public boolean apply(ClusterNode node) {
>> Boolean dataNode = node.attribute("kafkastreamer.service.node");
>>
>> return dataNode != null && dataNode;
>> }
>>
>>
>> I have a marshalling error java.io.NotSerializableException:
>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my
>> Service:
>>
>> private KafkaStreamer<String, String> kafkaStreamer = new
>> KafkaStreamer<>();
>>
>> I only can start service with:
>>
>> private static KafkaStreamer<String, String> kafkaStreamer = new
>> KafkaStreamer<>();
>>
>> Is it because Ignite trying data transfer KafkaStreamer instance between
>> nodes?
>>
>> Log:
>>
>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to
>> marshal service with configured marshaller [name=KafkaStreamerService,
>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6,
>> marsh=JdkMarshaller [clsFilter=null]]
>> class org.apache.ignite.IgniteCheckedException: Failed to serialize
>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
>> at
>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
>> at
>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
>> at
>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
>> at
>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
>> at
>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
>> at
>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
>> at
>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
>> at
>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
>> at
>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
>> at
>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
>> at
>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
>> at
>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
>> at
>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
>> at
>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
>> at
>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
>> at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
>> at
>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
>> at
>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
>> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
>> at
>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
>> at org.apache.ignite.Ignition.start(Ignition.java:346)
>> at
>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
>> Caused by: java.io.NotSerializableException:
>> org.apache.ignite.stream.kafka.KafkaStreamer
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
>> ... 25 more
>>
>