Hi,

I have a use case where I want a fully copy of a replicated cache on a
subset of my thick clients.  In this example, I have an ETL thick client
that creates and updates a replicated cache in my server grid.  I then have
a series of webserver thick clients that I always want a fully up to date
copy of that cache.

NearCache Attempt:
=============
I tried using a NearCache on the webserver thick clients but this had two
undesireable problems:
* it created a near cache on each server node which I have no use for since
this is already an replicated cache
* the near cache never received new entries from the replicated cache, it
was only updated with entries it had already had stored in it.

Is there a way I can resolve either of these two undesireable problems of
the NearCache for my situation?


Continuous Query Attempt:
=================
This lead me to instead consider Continuous Queries (CQ) where I would have
each webserver maintain it's own Map of the server cache data where upon
startup it uses the CQ initial query to get the current server state on
startup and then uses the CQ local listener.  

Trying to get the CQ working I followed the examples in text in
https://ignite.apache.org/docs/latest/configuring-caches/near-cache however
I can only see the local listener updates if I run the query in my own
thread that I never let finish.

What am I doing wrong in the code below?

Continuous Query code:
---------------------------

// Create new continuous query.
val qry = ContinuousQuery<Int, String>()

// have it return all data in its initial query
// Setting an optional initial query.
qry.setInitialQuery(ScanQuery())

// don't set a remote filter as we want all data returned
// qry.setRemoteFilterFactory()

// Callback that is called locally when update notifications are received.
qry.setLocalListener { events ->
    println("Update notifications
starting...[thread=${Thread.currentThread()}]")
    for (e in events) {
        println("Listener event: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
    }
    println("Update notifications finished
[thread=${Thread.currentThread()}]")
}

val executor = Executors.newSingleThreadExecutor()
executor.submit {
    myCache.query(qry).use { cur ->

        // Iterating over initial query results
        println("Initial query cursor
starting...[thread=${Thread.currentThread()}]")
        for (e in cur) {
            println("Cursor value: [thread=${Thread.currentThread()},
key=${e.key}, val=${e.value}]")
        }
        println("Initial query cursor finished
[thread=${Thread.currentThread()}]")

        println("Starting holding continuous query cursor open so we can get
callback data... [thread=${Thread.currentThread()}]")
        val shouldBeRunning = true
        while (shouldBeRunning) {
            // hold this thread open forever so the local listener callback
can keep processing events
            try {
                Thread.sleep(5000)
            } catch (e: InterruptedException) {
                println("Continuous query sleep interrupted
[thread=${Thread.currentThread()}]")
                throw e
            }
        }
        println("Stopping holding continuous query cursor open because we
got shutdown signal [thread=${Thread.currentThread()}]")
    }
}





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to