Well… I mean, if you really want to… :)

The issue is that spark may start or stop its executors as it sees fit – or, at 
least, 
not taking Ignite’s lifecycle into account. 
If an executor dies, the Ignite node dies. It doesn’t always lead to data loss, 
but it does lead to topology changes and rebalance which are the most costly 
operations for Ignite.
So each time spark will try to start or stop an executor to make your 
computations more
efficient, it will actually lead to Ignite doing a lot of extra work.

You can try using the embedded mode, but keep in mind possible performance drops
when executors are requested or removed, possible data loss (complete or 
partial)
and the fact that this mode is eventually going away. I’d advise to go for the 
standalone deployment.

Stan


From: mamaco
Sent: 15 октября 2018 г. 21:27
To: [email protected]
Subject: RE: 'Deploy' ignite cluster into a spark cluster

Hi Stan,
Thank you for providing the information.
Yes, it's exact 'embedded deployment', but it looks like to be deprecated
because of performance issues. 

Stanislav Lukyanov wrote
> Embedded Mode Deprecation
> Embedded mode implies starting Ignite server nodes within Spark executors
> which can cause unexpected rebalancing or even data loss. Therefore this
> mode is currently deprecated and will be eventually discontinued. Consider
> starting a separate Ignite cluster and using standalone mode to avoid data
> consistency and performance issues.

Sorry for my careless, I didn't notice that information before, it almost
answered my question.
But, how about if I use earlier ignite (1.9-), ensure On-Heap cache and let
Ignite handle the GC?
My case is straight forward streaming process (locality process comes with
hash partitioning, affinity collocation to ignite cache).
Is there any further potential problem? Your suggestion is appreciated.

My concerns:
This way makes deployment very easy, compare old SharedRDD solution, I don't
have to persuade DBA to initial ignite cluster one by one, and POJO classes,
that's also really a bottleneck to collect data from 'cluster' to 'driver'
and 'ignite', node to node local computing is preferable,  because all I
need to do is to ship the library to spark cluster from the driver node.

If spark job stopped, Ignite cluster will be terminated and the data will
lost(I saw this actions from the spark executor logs). If that is the case, 
the 'risk' is acceptable to me, I don't care about data loss issue. My
project is to receive concurrent kafka traffic data using spark streaming, I
need ignite to act a fast k/v dictionary to reduce deduplication computing
(before saving to Hbase), in the meantime ignite cache provide SQL
query/monitor service for curious users.

According to the information above,  do you have any suggestion? 
We have been confused about how to integrate bunch of open source projects
until I found apache spark has a big heart to supply everything.

Your help is appreciated.

Marco


Code:
#ship ignite 1.9 libraries to spark cluster
/spark-shell --master yarn --conf spark.ui.enabled=true --deploy-mode client
--num-executors 2 --jars
"/opt/ignite/assembly/1.9/Ignite-1.9-jar-with-dependencies.jar"


class LazyAgent(isClient: Boolean) extends Serializable  {
    lazy val conf=ic_conf()
    lazy val ignite=org.apache.ignite.Ignition.start(conf)
    lazy val cache=ignite.getOrCreateCache(gen_cache("test"))
    def ic_conf(): org.apache.ignite.configuration.IgniteConfiguration={
        val spi = new org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi()
        spi.setLocalPortRange(10)
        val ipfiner=new
org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder()
        ipfiner.setDataSource(JDBCDataSource())
        spi.setIpFinder(ipfiner)
        val cfg = new org.apache.ignite.configuration.IgniteConfiguration()
        cfg.setClientMode(isClient)
        cfg.setDiscoverySpi(spi)
        cfg
    }
    
    def JDBCDataSource(): javax.sql.DataSource={
        val dataSource=new com.mysql.jdbc.jdbc2.optional.MysqlDataSource()
        dataSource.setServerName({hostip})
        ......
        dataSource
    }
 
    def gen_cache(cache_name:String):
org.apache.ignite.configuration.CacheConfiguration[String,String] ={
        val cache_cfg = new
org.apache.ignite.configuration.CacheConfiguration[String,
String](cache_name)
        ......
        cache_cfg
    }
}

//on driver:
val instance= sc.broadcast(new LazyAgent(false))

//on datanode
rdd.foreachPartition(iter => {
   #here on each data node, ignite cluster will be created and only once.
   val cache = instance.value.cache
   iter.foreach(el=>{
       cache.put("k"+el.toString(),"value of ["+el.toString()+"]")
   })
})








--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to