Hello! I have just created sample project with your code (by running two standalone nodes with peerAssemblyLoadingMode="CurrentAppDomain" as well as one client with project code) and it runs just fine. Both Hello World closure and filter are executed just fine.
This is slightly puzzling to me, as I would expect cache key-value types to NOT be peer loaded, but still, it works. Please share reproducer project if it fails to work for you after checking configurations. Regards, -- Ilya Kasnacheev 2018-08-09 17:58 GMT+03:00 Som Som <[email protected]>: > in first example i have not deployd HelloAction class manually but it > works correctly it means that class was successfully transfered and > deployed. but why this does not work in case of EmployeeEventFilter class? > is it an error? > > чт, 9 авг. 2018 г., 15:46 Ilya Kasnacheev <[email protected]>: > >> Hello! >> >> I'm not sure that C# has peer class loading. Are you sure that you have >> this filter's code deployed on your server nodes? >> >> Regards, >> >> -- >> Ilya Kasnacheev >> >> 2018-08-09 15:23 GMT+03:00 Som Som <[email protected]>: >> >>> is there any information? >>> >>> ---------- Forwarded message --------- >>> From: Som Som <[email protected]> >>> Date: ср, 8 авг. 2018 г., 16:46 >>> Subject: continous query remote filter issue >>> To: <[email protected]> >>> >>> >>> hello. >>> >>> It looks like peerAssemblyLoadingMode flag doesn’t work correctly in >>> case of CacheEntryEventFilter: >>> >>> >>> >>> As an example: >>> >>> >>> >>> 1) This code works fine and I see “Hello world” on the server console. >>> It means that HelloAction class was successfully transferred to server. >>> >>> >>> >>> class Program >>> >>> { >>> >>> static void Main(string[] args) >>> >>> { >>> >>> using (var ignite = Ignition.StartFromApplicationConfigurat >>> ion()) >>> >>> { >>> >>> var remotes = ignite.GetCluster().ForRemotes(); >>> >>> remotes.GetCompute().Broadcast(newHelloAction()); >>> >>> } >>> >>> } >>> >>> >>> >>> class HelloAction : IComputeAction >>> >>> { >>> >>> public void Invoke() >>> >>> { >>> >>> Console.WriteLine("Hello, World!"); >>> >>> } >>> >>> } >>> >>> } >>> >>> 2) But this code that sends the filter class to the remote server >>> node generates an error and I receive 4 entries of Employee instead of >>> 2 as expected: >>> >>> class Program >>> >>> { >>> >>> public class Employee >>> >>> { >>> >>> public Employee(string name, long salary) >>> >>> { >>> >>> Name = name; >>> >>> Salary = salary; >>> >>> } >>> >>> >>> >>> [QuerySqlField] >>> >>> public string Name { get; set; } >>> >>> >>> >>> [QuerySqlField] >>> >>> public long Salary { get; set; } >>> >>> >>> >>> public override string ToString() >>> >>> { >>> >>> return string.Format("{0} [name={1}, salary={2}]", >>> typeof(Employee).Name, Name, Salary); >>> >>> } >>> >>> } >>> >>> >>> >>> class EmployeeEventListener :ICacheEntryEventListener<int, >>> Employee> >>> >>> { >>> >>> public void OnEvent(IEnumerable<ICacheEntryEvent<int, >>> Employee>> evts) >>> >>> { >>> >>> foreach(var evt in evts) >>> >>> Console.WriteLine(evt.Value); >>> >>> } >>> >>> } >>> >>> >>> >>> class EmployeeEventFilter :ICacheEntryEventFilter<int, Employee> >>> >>> { >>> >>> public bool Evaluate(ICacheEntryEvent<int,Employee> evt) >>> >>> { >>> >>> return evt.Value.Salary > 5000; >>> >>> } >>> >>> } >>> >>> >>> >>> static void Main(string[] args) >>> >>> { >>> >>> using (var ignite = Ignition.StartFromApplicationConfigurat >>> ion()) >>> >>> { >>> >>> var employeeCache = ignite.GetOrCreateCache<int, E >>> mployee>( >>> >>> new CacheConfiguration("employee", newQueryEntity( >>> typeof(int), typeof(Employee))) { SqlSchema = "PUBLIC" }); >>> >>> >>> >>> >>> >>> >>> >>> var query = new ContinuousQuery<int,Employee>(new >>> EmployeeEventListener()) >>> >>> { >>> >>> Filter = new EmployeeEventFilter() >>> >>> }; >>> >>> >>> >>> var queryHandle = employeeCache.QueryContinuous(query); >>> >>> >>> >>> employeeCache.Put(1, newEmployee("James Wilson", 1000)); >>> >>> employeeCache.Put(2, new Employee("Daniel Adams", >>> 2000)); >>> >>> employeeCache.Put(3, newEmployee("Cristian Moss", >>> 7000)); >>> >>> employeeCache.Put(4, newEmployee("Allison Mathis", >>> 8000)); >>> >>> >>> >>> Console.WriteLine("Press any key..."); >>> >>> Console.ReadKey(); >>> >>> } >>> >>> } >>> >>> } >>> >>> Server node console output: >>> >>> [16:26:33] __________ ________________ >>> >>> [16:26:33] / _/ ___/ |/ / _/_ __/ __/ >>> >>> [16:26:33] _/ // (7 7 // / / / / _/ >>> >>> [16:26:33] /___/\___/_/|_/___/ /_/ /___/ >>> >>> [16:26:33] >>> >>> [16:26:33] ver. 2.7.0.20180721#19700101-sha1:DEV >>> >>> [16:26:33] 2018 Copyright(C) Apache Software Foundation >>> >>> [16:26:33] >>> >>> [16:26:33] Ignite documentation: http://ignite.apache.org >>> >>> [16:26:33] >>> >>> [16:26:33] Quiet mode. >>> >>> [16:26:33] ^-- Logging to file 'C:\Ignite\apache-ignite- >>> fabric-2.7.0.20180721-bin\work\log\ignite-b1061a07.0.log' >>> >>> [16:26:33] ^-- Logging by 'JavaLogger [quiet=true, config=null]' >>> >>> [16:26:33] ^-- To see **FULL** console log here add >>> -DIGNITE_QUIET=false or "-v" to ignite.{sh|bat} >>> >>> [16:26:33] >>> >>> [16:26:33] OS: Windows Server 2016 10.0 amd64 >>> >>> [16:26:33] VM information: Java(TM) SE Runtime Environment 1.8.0_161-b12 >>> Oracle Corporation Java HotSpot(TM) 64-Bit Server VM 25.161-b12 >>> >>> [16:26:33] Please set system property '-Djava.net.preferIPv4Stack=true' >>> to avoid possible problems in mixed environments. >>> >>> [16:26:33] Configured plugins: >>> >>> [16:26:33] ^-- None >>> >>> [16:26:33] >>> >>> [16:26:33] Configured failure handler: [hnd=StopNodeOrHaltFailureHandler >>> [tryStop=false, timeout=0]] >>> >>> [16:26:33] Message queue limit is set to 0 which may lead to potential >>> OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due >>> to message queues growth on sender and receiver sides. >>> >>> [16:26:33] Security status [authentication=off, tls/ssl=off] >>> >>> [16:26:35] Performance suggestions for grid (fix if possible) >>> >>> [16:26:35] To disable, set -DIGNITE_PERFORMANCE_ >>> SUGGESTIONS_DISABLED=true >>> >>> [16:26:35] ^-- Enable G1 Garbage Collector (add '-XX:+UseG1GC' to JVM >>> options) >>> >>> [16:26:35] ^-- Specify JVM heap max size (add >>> '-Xmx<size>[g|G|m|M|k|K]' to JVM options) >>> >>> [16:26:35] ^-- Set max direct memory size if getting 'OOME: Direct >>> buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to >>> JVM options) >>> >>> [16:26:35] ^-- Disable processing of calls to System.gc() (add >>> '-XX:+DisableExplicitGC' to JVM options) >>> >>> [16:26:35] Refer to this page for more performance suggestions: https:// >>> apacheignite.readme.io/docs/jvm-and-system-tuning >>> >>> [16:26:35] >>> >>> [16:26:35] To start Console Management & Monitoring run >>> ignitevisorcmd.{sh|bat} >>> >>> [16:26:35] >>> >>> [16:26:35] Ignite node started OK (id=b1061a07) >>> >>> [16:26:35] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, >>> offheap=6.4GB, heap=7.1GB] >>> >>> [16:26:35] ^-- Node [id=B1061A07-4404-47BE-8EB4-160F6D174BF4, >>> clusterState=ACTIVE] >>> >>> [16:26:35] Data Regions Configured: >>> >>> [16:26:35] ^-- default [initSize=256.0 MiB, maxSize=6.4 GiB, >>> persistenceEnabled=false] >>> >>> [16:26:42] Topology snapshot [ver=2, servers=1, clients=1, CPUs=12, >>> offheap=6.4GB, heap=11.0GB] >>> >>> [16:26:42] ^-- Node [id=B1061A07-4404-47BE-8EB4-160F6D174BF4, >>> clusterState=ACTIVE] >>> >>> [16:26:42] Data Regions Configured: >>> >>> [16:26:42] ^-- default [initSize=256.0 MiB, maxSize=6.4 GiB, >>> persistenceEnabled=false] >>> >>> [16:26:44,523][SEVERE][sys-stripe-1-#2][] Failure in Java callback >>> >>> class org.apache.ignite.IgniteException: Platform >>> error:Apache.Ignite.Core.Binary.BinaryObjectException: No matching type >>> found for object [typeId=-1369457415, typeName= >>> IngiteExample.Program+EmployeeEventFilter]. >>> This usually indicates that assembly with specified type is not loaded on a >>> node. When using Apache.Ignite.exe, make sure to load assemblies with >>> -assembly parameter. Alternatively, set >>> IgniteConfiguration.PeerAssemblyLoadingEnabled >>> to true. >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.ReadFullObject[T](Int32 >>> pos, Type typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.TryDeserialize[T](T& >>> res, Type typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.Deserialize[T](Type >>> typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Cache.Query.Continuous. >>> ContinuousQueryFilterHolder..ctor(IBinaryRawReader reader) >>> >>> at >>> Apache.Ignite.Core.Impl.Binary.Marshaller.<AddSystemTypes>b__d(BinaryReader >>> r) >>> >>> at >>> Apache.Ignite.Core.Impl.Binary.BinarySystemTypeSerializer`1.ReadBinary[T1](BinaryReader >>> reader, IBinaryTypeDescriptor desc, Int32 pos, Type typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.ReadFullObject[T](Int32 >>> pos, Type typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.TryDeserialize[T](T& >>> res, Type typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.Deserialize[T](Type >>> typeOverride) >>> >>> at >>> Apache.Ignite.Core.Impl.Binary.BinaryReader.ReadBinaryObject[T](Boolean >>> doDetach) >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.TryDeserialize[T](T& >>> res, Type typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Binary.BinaryReader.Deserialize[T](Type >>> typeOverride) >>> >>> at Apache.Ignite.Core.Impl.Unmanaged.UnmanagedCallbacks. >>> ContinuousQueryFilterCreate(Int64 memPtr) >>> >>> at >>> Apache.Ignite.Core.Impl.Unmanaged.UnmanagedCallbacks.InLongOutLong(Int32 >>> type, Int64 val) >>> >>> at org.apache.ignite.internal.processors.platform. >>> PlatformProcessorImpl.loggerLog(PlatformProcessorImpl.java:403) >>> >>> at org.apache.ignite.internal.processors.platform. >>> PlatformProcessorImpl.processInStreamOutLong(PlatformProcessorImpl.java: >>> 459) >>> >>> at org.apache.ignite.internal.processors.platform. >>> PlatformProcessorImpl.processInStreamOutLong(PlatformProcessorImpl.java: >>> 511) >>> >>> at org.apache.ignite.internal.processors.platform. >>> PlatformTargetProxyImpl.inStreamOutLong(PlatformTargetProxyImpl.java:67) >>> >>> at org.apache.ignite.internal.processors.platform.callback. >>> PlatformCallbackUtils.inLongOutLong(Native Method) >>> >>> at org.apache.ignite.internal.processors.platform.callback. >>> PlatformCallbackGateway.continuousQueryFilterCreate( >>> PlatformCallbackGateway.java:394) >>> >>> at org.apache.ignite.internal.processors.platform.cache.query. >>> PlatformContinuousQueryRemoteFilter.deploy( >>> PlatformContinuousQueryRemoteFilter.java:124) >>> >>> at org.apache.ignite.internal.processors.platform.cache.query. >>> PlatformContinuousQueryRemoteFilter.evaluate( >>> PlatformContinuousQueryRemoteFilter.java:84) >>> >>> at org.apache.ignite.internal.processors.cache.query.continuous. >>> CacheContinuousQueryHandler.filter(CacheContinuousQueryHandler.java:833) >>> >>> at org.apache.ignite.internal.processors.cache.query.continuous. >>> CacheContinuousQueryHandler$2.onEntryUpdated( >>> CacheContinuousQueryHandler.java:422) >>> >>> at org.apache.ignite.internal.processors.cache.query.continuous. >>> CacheContinuousQueryManager.onEntryUpdated(CacheContinuousQueryManager. >>> java:399) >>> >>> at org.apache.ignite.internal.processors.cache. >>> GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1923) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2436) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache.update(GridDhtAtomicCache.java:1898) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0( >>> GridDhtAtomicCache.java:1740) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal( >>> GridDhtAtomicCache.java:1630) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest >>> (GridDhtAtomicCache.java:3071) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache.access$400(GridDhtAtomicCache.java:133) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache$5.apply(GridDhtAtomicCache.java:269) >>> >>> at org.apache.ignite.internal.processors.cache.distributed. >>> dht.atomic.GridDhtAtomicCache$5.apply(GridDhtAtomicCache.java:264) >>> >>> at org.apache.ignite.internal.processors.cache. >>> GridCacheIoManager.processMessage(GridCacheIoManager.java:1056) >>> >>> at org.apache.ignite.internal.processors.cache. >>> GridCacheIoManager.onMessage0(GridCacheIoManager.java:581) >>> >>> at org.apache.ignite.internal.processors.cache. >>> GridCacheIoManager.handleMessage(GridCacheIoManager.java:380) >>> >>> at org.apache.ignite.internal.processors.cache. >>> GridCacheIoManager.handleMessage(GridCacheIoManager.java:306) >>> >>> at org.apache.ignite.internal.processors.cache. >>> GridCacheIoManager.access$100(GridCacheIoManager.java:101) >>> >>> at org.apache.ignite.internal.processors.cache. >>> GridCacheIoManager$1.onMessage(GridCacheIoManager.java:295) >>> >>> at org.apache.ignite.internal.managers.communication. >>> GridIoManager.invokeListener(GridIoManager.java:1556) >>> >>> at org.apache.ignite.internal.managers.communication. >>> GridIoManager.processRegularMessage0(GridIoManager.java:1184) >>> >>> at org.apache.ignite.internal.managers.communication. >>> GridIoManager.access$4200(GridIoManager.java:125) >>> >>> at org.apache.ignite.internal.managers.communication. >>> GridIoManager$9.run(GridIoManager.java:1091) >>> >>> at org.apache.ignite.internal.util.StripedExecutor$Stripe. >>> body(StripedExecutor.java:496) >>> >>> at org.apache.ignite.internal.util.worker.GridWorker.run( >>> GridWorker.java:110) >>> >>> at java.lang.Thread.run(Thread.java:748) >>> >>> [16:26:44,538][SEVERE][sys-stripe-1-#2][query] CacheEntryEventFilter >>> failed: javax.cache.event.CacheEntryListenerException: Failed to deploy >>> the filter. >>> >>> [16:26:44,554][SEVERE][sys-stripe-2-#3][query] CacheEntryEventFilter >>> failed: javax.cache.event.CacheEntryListenerException: Failed to deploy >>> the filter because it has been closed. >>> >>> [16:26:44,554][SEVERE][sys-stripe-3-#4][query] CacheEntryEventFilter >>> failed: javax.cache.event.CacheEntryListenerException: Failed to deploy >>> the filter because it has been closed. >>> >>> [16:26:44,570][SEVERE][sys-stripe-4-#5][query] CacheEntryEventFilter >>> failed: javax.cache.event.CacheEntryListenerException: Failed to deploy >>> the filter because it has been closed. >>> >> >>
