Ok, if i put an assignment to init(), how will i start it in execute()
method, and stop in canсel()? This example was taken here
http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html


чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <[email protected]>:

> Hello!
>
> You should do the assignment in init() method.
>
> Don't create KafkaStreamer before service is sent over and is ready for
> initialization.
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[email protected]>:
>
>> public class KafkaStreamerService implements Service {
>>
>>     public static final String SERVICE_NAME = "KafkaStreamerService";
>>     private static final long serialVersionUID = 1L;
>>
>>     @IgniteInstanceResource
>>     private Ignite ignite;
>>     private KafkaStreamer<String, String> kafkaStreamer = new
>> KafkaStreamer<>();
>>     private IgniteLogger log;
>>
>> чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[email protected]>:
>>
>>> Hello!
>>>
>>> Where do you assign kafkaStreamer field?
>>>
>>> Regards,
>>> --
>>> Ilya Kasnacheev
>>>
>>>
>>> чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[email protected]>:
>>>
>>>>     Now i disabled node Filtering. You mean i started KafkaStreamer in
>>>> init()?
>>>> I started KafkaStreamer like:
>>>>
>>>> @Override
>>>>     public void execute(ServiceContext ctx) throws Exception {
>>>>         log.info("KafkaStreamerService starting ...");
>>>>         kafkaStreamer.start();
>>>>         log.info("KafkaStreamerService started OK");
>>>>
>>>> In init() i only configure KafkaStreamer parameters.
>>>>
>>>> @Override
>>>>     public void init(ServiceContext ctx) throws Exception {
>>>>         log = ignite.log();
>>>>
>>>>         IgniteDataStreamer<String, String> stmr =
>>>> ignite.dataStreamer("kafkaCache");
>>>>         stmr.allowOverwrite(true);
>>>>         stmr.autoFlushFrequency(1000);
>>>>
>>>>         kafkaStreamer.setIgnite(ignite);
>>>>         kafkaStreamer.setStreamer(stmr);
>>>>         kafkaStreamer.setThreads(4);
>>>> ...
>>>>
>>>>
>>>> If i have to avoid fat instances, how i can share kafkaStreamer
>>>> instance between init(), execute() and cancel()?
>>>>
>>>>
>>>> чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[email protected]
>>>> >:
>>>>
>>>>> Hello!
>>>>>
>>>>> You should probably start KafkaStreamer on remote node when the
>>>>> service is initialized (init()), instead of starting it in e.g. 
>>>>> constructor
>>>>> and trying to send it to remote node.
>>>>>
>>>>> Avoid putting fat instances in the fields of service/compute/predicate
>>>>> classes.
>>>>>
>>>>> Regards,
>>>>> --
>>>>> Ilya Kasnacheev
>>>>>
>>>>>
>>>>> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[email protected]>:
>>>>>
>>>>>> I have 1 DataNod and 1 Service with streaming. I have a filter for
>>>>>> service:
>>>>>>
>>>>>> <property name="nodeFilter">
>>>>>> <bean class="common.filters.KafkaStreamerServiceFilter"/>
>>>>>> </property>
>>>>>>
>>>>>> public boolean apply(ClusterNode node) {
>>>>>> Boolean dataNode = node.attribute("kafkastreamer.service.node");
>>>>>>
>>>>>> return dataNode != null && dataNode;
>>>>>> }
>>>>>>
>>>>>>
>>>>>> I have a marshalling error java.io.NotSerializableException:
>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my
>>>>>> Service:
>>>>>>
>>>>>> private KafkaStreamer<String, String> kafkaStreamer = new
>>>>>> KafkaStreamer<>();
>>>>>>
>>>>>> I only can start service with:
>>>>>>
>>>>>> private static KafkaStreamer<String, String> kafkaStreamer = new
>>>>>> KafkaStreamer<>();
>>>>>>
>>>>>> Is it because Ignite trying data transfer KafkaStreamer instance
>>>>>> between nodes?
>>>>>>
>>>>>> Log:
>>>>>>
>>>>>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed
>>>>>> to marshal service with configured marshaller [name=KafkaStreamerService,
>>>>>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6,
>>>>>> marsh=JdkMarshaller [clsFilter=null]]
>>>>>> class org.apache.ignite.IgniteCheckedException: Failed to serialize
>>>>>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
>>>>>> at
>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
>>>>>> at
>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
>>>>>> at
>>>>>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
>>>>>> at
>>>>>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
>>>>>> at
>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
>>>>>> at
>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
>>>>>> at
>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
>>>>>> at
>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
>>>>>> at
>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
>>>>>> at
>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
>>>>>> at
>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
>>>>>> at
>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
>>>>>> at
>>>>>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
>>>>>> at
>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
>>>>>> at
>>>>>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
>>>>>> at
>>>>>> org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
>>>>>> at
>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
>>>>>> at
>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
>>>>>> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
>>>>>> at
>>>>>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
>>>>>> at org.apache.ignite.Ignition.start(Ignition.java:346)
>>>>>> at
>>>>>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
>>>>>> Caused by: java.io.NotSerializableException:
>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer
>>>>>> at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>> at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>> at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>> at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>> at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>> at
>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
>>>>>> ... 25 more
>>>>>>
>>>>>

Reply via email to