oh sorry, I didn’t immediately understand what you mean.Thank you! чт, 2 июл. 2020 г. в 19:25, Ilya Kasnacheev <[email protected]>:
> Hello! > > init() will be called before execute() and stop(). > > To be extra sure, you can check for null. > > Regards, > -- > Ilya Kasnacheev > > > чт, 2 июл. 2020 г. в 19:06, Maxim Volkomorov <[email protected]>: > >> Ok, if i put an assignment to init(), how will i start it in execute() >> method, and stop in canсel()? This example was taken here >> http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html >> >> >> чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <[email protected]>: >> >>> Hello! >>> >>> You should do the assignment in init() method. >>> >>> Don't create KafkaStreamer before service is sent over and is ready for >>> initialization. >>> >>> Regards, >>> -- >>> Ilya Kasnacheev >>> >>> >>> чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[email protected]>: >>> >>>> public class KafkaStreamerService implements Service { >>>> >>>> public static final String SERVICE_NAME = "KafkaStreamerService"; >>>> private static final long serialVersionUID = 1L; >>>> >>>> @IgniteInstanceResource >>>> private Ignite ignite; >>>> private KafkaStreamer<String, String> kafkaStreamer = new >>>> KafkaStreamer<>(); >>>> private IgniteLogger log; >>>> >>>> чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[email protected] >>>> >: >>>> >>>>> Hello! >>>>> >>>>> Where do you assign kafkaStreamer field? >>>>> >>>>> Regards, >>>>> -- >>>>> Ilya Kasnacheev >>>>> >>>>> >>>>> чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[email protected]>: >>>>> >>>>>> Now i disabled node Filtering. You mean i started KafkaStreamer >>>>>> in init()? >>>>>> I started KafkaStreamer like: >>>>>> >>>>>> @Override >>>>>> public void execute(ServiceContext ctx) throws Exception { >>>>>> log.info("KafkaStreamerService starting ..."); >>>>>> kafkaStreamer.start(); >>>>>> log.info("KafkaStreamerService started OK"); >>>>>> >>>>>> In init() i only configure KafkaStreamer parameters. >>>>>> >>>>>> @Override >>>>>> public void init(ServiceContext ctx) throws Exception { >>>>>> log = ignite.log(); >>>>>> >>>>>> IgniteDataStreamer<String, String> stmr = >>>>>> ignite.dataStreamer("kafkaCache"); >>>>>> stmr.allowOverwrite(true); >>>>>> stmr.autoFlushFrequency(1000); >>>>>> >>>>>> kafkaStreamer.setIgnite(ignite); >>>>>> kafkaStreamer.setStreamer(stmr); >>>>>> kafkaStreamer.setThreads(4); >>>>>> ... >>>>>> >>>>>> >>>>>> If i have to avoid fat instances, how i can share kafkaStreamer >>>>>> instance between init(), execute() and cancel()? >>>>>> >>>>>> >>>>>> чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev < >>>>>> [email protected]>: >>>>>> >>>>>>> Hello! >>>>>>> >>>>>>> You should probably start KafkaStreamer on remote node when the >>>>>>> service is initialized (init()), instead of starting it in e.g. >>>>>>> constructor >>>>>>> and trying to send it to remote node. >>>>>>> >>>>>>> Avoid putting fat instances in the fields of >>>>>>> service/compute/predicate classes. >>>>>>> >>>>>>> Regards, >>>>>>> -- >>>>>>> Ilya Kasnacheev >>>>>>> >>>>>>> >>>>>>> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[email protected]>: >>>>>>> >>>>>>>> I have 1 DataNod and 1 Service with streaming. I have a filter for >>>>>>>> service: >>>>>>>> >>>>>>>> <property name="nodeFilter"> >>>>>>>> <bean class="common.filters.KafkaStreamerServiceFilter"/> >>>>>>>> </property> >>>>>>>> >>>>>>>> public boolean apply(ClusterNode node) { >>>>>>>> Boolean dataNode = node.attribute("kafkastreamer.service.node"); >>>>>>>> >>>>>>>> return dataNode != null && dataNode; >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> I have a marshalling error java.io.NotSerializableException: >>>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my >>>>>>>> Service: >>>>>>>> >>>>>>>> private KafkaStreamer<String, String> kafkaStreamer = new >>>>>>>> KafkaStreamer<>(); >>>>>>>> >>>>>>>> I only can start service with: >>>>>>>> >>>>>>>> private static KafkaStreamer<String, String> kafkaStreamer = new >>>>>>>> KafkaStreamer<>(); >>>>>>>> >>>>>>>> Is it because Ignite trying data transfer KafkaStreamer instance >>>>>>>> between nodes? >>>>>>>> >>>>>>>> Log: >>>>>>>> >>>>>>>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] >>>>>>>> Failed to marshal service with configured marshaller >>>>>>>> [name=KafkaStreamerService, >>>>>>>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, >>>>>>>> marsh=JdkMarshaller [clsFilter=null]] >>>>>>>> class org.apache.ignite.IgniteCheckedException: Failed to serialize >>>>>>>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6 >>>>>>>> at >>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102) >>>>>>>> at >>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109) >>>>>>>> at >>>>>>>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861) >>>>>>>> at >>>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032) >>>>>>>> at >>>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029) >>>>>>>> at >>>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427) >>>>>>>> at >>>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117) >>>>>>>> at >>>>>>>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035) >>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921) >>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820) >>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690) >>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659) >>>>>>>> at org.apache.ignite.Ignition.start(Ignition.java:346) >>>>>>>> at >>>>>>>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40) >>>>>>>> Caused by: java.io.NotSerializableException: >>>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer >>>>>>>> at >>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>>>> at >>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>>>>> at >>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>>>>> at >>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>>>> at >>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>>>> at >>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>>>> at >>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97) >>>>>>>> ... 25 more >>>>>>>> >>>>>>>
