oh sorry, I didn’t immediately understand what you mean.Thank you!

чт, 2 июл. 2020 г. в 19:25, Ilya Kasnacheev <[email protected]>:

> Hello!
>
> init() will be called before execute() and stop().
>
> To be extra sure, you can check for null.
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> чт, 2 июл. 2020 г. в 19:06, Maxim Volkomorov <[email protected]>:
>
>> Ok, if i put an assignment to init(), how will i start it in execute()
>> method, and stop in canсel()? This example was taken here
>> http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html
>>
>>
>> чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <[email protected]>:
>>
>>> Hello!
>>>
>>> You should do the assignment in init() method.
>>>
>>> Don't create KafkaStreamer before service is sent over and is ready for
>>> initialization.
>>>
>>> Regards,
>>> --
>>> Ilya Kasnacheev
>>>
>>>
>>> чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[email protected]>:
>>>
>>>> public class KafkaStreamerService implements Service {
>>>>
>>>>     public static final String SERVICE_NAME = "KafkaStreamerService";
>>>>     private static final long serialVersionUID = 1L;
>>>>
>>>>     @IgniteInstanceResource
>>>>     private Ignite ignite;
>>>>     private KafkaStreamer<String, String> kafkaStreamer = new
>>>> KafkaStreamer<>();
>>>>     private IgniteLogger log;
>>>>
>>>> чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[email protected]
>>>> >:
>>>>
>>>>> Hello!
>>>>>
>>>>> Where do you assign kafkaStreamer field?
>>>>>
>>>>> Regards,
>>>>> --
>>>>> Ilya Kasnacheev
>>>>>
>>>>>
>>>>> чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[email protected]>:
>>>>>
>>>>>>     Now i disabled node Filtering. You mean i started KafkaStreamer
>>>>>> in init()?
>>>>>> I started KafkaStreamer like:
>>>>>>
>>>>>> @Override
>>>>>>     public void execute(ServiceContext ctx) throws Exception {
>>>>>>         log.info("KafkaStreamerService starting ...");
>>>>>>         kafkaStreamer.start();
>>>>>>         log.info("KafkaStreamerService started OK");
>>>>>>
>>>>>> In init() i only configure KafkaStreamer parameters.
>>>>>>
>>>>>> @Override
>>>>>>     public void init(ServiceContext ctx) throws Exception {
>>>>>>         log = ignite.log();
>>>>>>
>>>>>>         IgniteDataStreamer<String, String> stmr =
>>>>>> ignite.dataStreamer("kafkaCache");
>>>>>>         stmr.allowOverwrite(true);
>>>>>>         stmr.autoFlushFrequency(1000);
>>>>>>
>>>>>>         kafkaStreamer.setIgnite(ignite);
>>>>>>         kafkaStreamer.setStreamer(stmr);
>>>>>>         kafkaStreamer.setThreads(4);
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> If i have to avoid fat instances, how i can share kafkaStreamer
>>>>>> instance between init(), execute() and cancel()?
>>>>>>
>>>>>>
>>>>>> чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <
>>>>>> [email protected]>:
>>>>>>
>>>>>>> Hello!
>>>>>>>
>>>>>>> You should probably start KafkaStreamer on remote node when the
>>>>>>> service is initialized (init()), instead of starting it in e.g. 
>>>>>>> constructor
>>>>>>> and trying to send it to remote node.
>>>>>>>
>>>>>>> Avoid putting fat instances in the fields of
>>>>>>> service/compute/predicate classes.
>>>>>>>
>>>>>>> Regards,
>>>>>>> --
>>>>>>> Ilya Kasnacheev
>>>>>>>
>>>>>>>
>>>>>>> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[email protected]>:
>>>>>>>
>>>>>>>> I have 1 DataNod and 1 Service with streaming. I have a filter for
>>>>>>>> service:
>>>>>>>>
>>>>>>>> <property name="nodeFilter">
>>>>>>>> <bean class="common.filters.KafkaStreamerServiceFilter"/>
>>>>>>>> </property>
>>>>>>>>
>>>>>>>> public boolean apply(ClusterNode node) {
>>>>>>>> Boolean dataNode = node.attribute("kafkastreamer.service.node");
>>>>>>>>
>>>>>>>> return dataNode != null && dataNode;
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> I have a marshalling error java.io.NotSerializableException:
>>>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my
>>>>>>>> Service:
>>>>>>>>
>>>>>>>> private KafkaStreamer<String, String> kafkaStreamer = new
>>>>>>>> KafkaStreamer<>();
>>>>>>>>
>>>>>>>> I only can start service with:
>>>>>>>>
>>>>>>>> private static KafkaStreamer<String, String> kafkaStreamer = new
>>>>>>>> KafkaStreamer<>();
>>>>>>>>
>>>>>>>> Is it because Ignite trying data transfer KafkaStreamer instance
>>>>>>>> between nodes?
>>>>>>>>
>>>>>>>> Log:
>>>>>>>>
>>>>>>>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor]
>>>>>>>> Failed to marshal service with configured marshaller
>>>>>>>> [name=KafkaStreamerService,
>>>>>>>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6,
>>>>>>>> marsh=JdkMarshaller [clsFilter=null]]
>>>>>>>> class org.apache.ignite.IgniteCheckedException: Failed to serialize
>>>>>>>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
>>>>>>>> at
>>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
>>>>>>>> at
>>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
>>>>>>>> at
>>>>>>>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
>>>>>>>> at
>>>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
>>>>>>>> at
>>>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
>>>>>>>> at
>>>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
>>>>>>>> at
>>>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
>>>>>>>> at
>>>>>>>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
>>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
>>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
>>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
>>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
>>>>>>>> at org.apache.ignite.Ignition.start(Ignition.java:346)
>>>>>>>> at
>>>>>>>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer
>>>>>>>> at
>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>>> at
>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>>> at
>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>>> at
>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>>> at
>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>>> at
>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>>> at
>>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
>>>>>>>> ... 25 more
>>>>>>>>
>>>>>>>

Reply via email to