Ok, if i put an assignment to init(), how will i start it in execute() method, and stop in canсel()? This example was taken here http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html
чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <[email protected]>: > Hello! > > You should do the assignment in init() method. > > Don't create KafkaStreamer before service is sent over and is ready for > initialization. > > Regards, > -- > Ilya Kasnacheev > > > чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <[email protected]>: > >> public class KafkaStreamerService implements Service { >> >> public static final String SERVICE_NAME = "KafkaStreamerService"; >> private static final long serialVersionUID = 1L; >> >> @IgniteInstanceResource >> private Ignite ignite; >> private KafkaStreamer<String, String> kafkaStreamer = new >> KafkaStreamer<>(); >> private IgniteLogger log; >> >> чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <[email protected]>: >> >>> Hello! >>> >>> Where do you assign kafkaStreamer field? >>> >>> Regards, >>> -- >>> Ilya Kasnacheev >>> >>> >>> чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <[email protected]>: >>> >>>> Now i disabled node Filtering. You mean i started KafkaStreamer in >>>> init()? >>>> I started KafkaStreamer like: >>>> >>>> @Override >>>> public void execute(ServiceContext ctx) throws Exception { >>>> log.info("KafkaStreamerService starting ..."); >>>> kafkaStreamer.start(); >>>> log.info("KafkaStreamerService started OK"); >>>> >>>> In init() i only configure KafkaStreamer parameters. >>>> >>>> @Override >>>> public void init(ServiceContext ctx) throws Exception { >>>> log = ignite.log(); >>>> >>>> IgniteDataStreamer<String, String> stmr = >>>> ignite.dataStreamer("kafkaCache"); >>>> stmr.allowOverwrite(true); >>>> stmr.autoFlushFrequency(1000); >>>> >>>> kafkaStreamer.setIgnite(ignite); >>>> kafkaStreamer.setStreamer(stmr); >>>> kafkaStreamer.setThreads(4); >>>> ... >>>> >>>> >>>> If i have to avoid fat instances, how i can share kafkaStreamer >>>> instance between init(), execute() and cancel()? >>>> >>>> >>>> чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <[email protected] >>>> >: >>>> >>>>> Hello! >>>>> >>>>> You should probably start KafkaStreamer on remote node when the >>>>> service is initialized (init()), instead of starting it in e.g. >>>>> constructor >>>>> and trying to send it to remote node. >>>>> >>>>> Avoid putting fat instances in the fields of service/compute/predicate >>>>> classes. >>>>> >>>>> Regards, >>>>> -- >>>>> Ilya Kasnacheev >>>>> >>>>> >>>>> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <[email protected]>: >>>>> >>>>>> I have 1 DataNod and 1 Service with streaming. I have a filter for >>>>>> service: >>>>>> >>>>>> <property name="nodeFilter"> >>>>>> <bean class="common.filters.KafkaStreamerServiceFilter"/> >>>>>> </property> >>>>>> >>>>>> public boolean apply(ClusterNode node) { >>>>>> Boolean dataNode = node.attribute("kafkastreamer.service.node"); >>>>>> >>>>>> return dataNode != null && dataNode; >>>>>> } >>>>>> >>>>>> >>>>>> I have a marshalling error java.io.NotSerializableException: >>>>>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my >>>>>> Service: >>>>>> >>>>>> private KafkaStreamer<String, String> kafkaStreamer = new >>>>>> KafkaStreamer<>(); >>>>>> >>>>>> I only can start service with: >>>>>> >>>>>> private static KafkaStreamer<String, String> kafkaStreamer = new >>>>>> KafkaStreamer<>(); >>>>>> >>>>>> Is it because Ignite trying data transfer KafkaStreamer instance >>>>>> between nodes? >>>>>> >>>>>> Log: >>>>>> >>>>>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed >>>>>> to marshal service with configured marshaller [name=KafkaStreamerService, >>>>>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, >>>>>> marsh=JdkMarshaller [clsFilter=null]] >>>>>> class org.apache.ignite.IgniteCheckedException: Failed to serialize >>>>>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6 >>>>>> at >>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102) >>>>>> at >>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109) >>>>>> at >>>>>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57) >>>>>> at >>>>>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386) >>>>>> at >>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583) >>>>>> at >>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541) >>>>>> at >>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354) >>>>>> at >>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861) >>>>>> at >>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032) >>>>>> at >>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029) >>>>>> at >>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427) >>>>>> at >>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099) >>>>>> at >>>>>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299) >>>>>> at >>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943) >>>>>> at >>>>>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960) >>>>>> at >>>>>> org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276) >>>>>> at >>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045) >>>>>> at >>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703) >>>>>> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117) >>>>>> at >>>>>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035) >>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921) >>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820) >>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690) >>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659) >>>>>> at org.apache.ignite.Ignition.start(Ignition.java:346) >>>>>> at >>>>>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40) >>>>>> Caused by: java.io.NotSerializableException: >>>>>> org.apache.ignite.stream.kafka.KafkaStreamer >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>> at >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>> at >>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97) >>>>>> ... 25 more >>>>>> >>>>>
