Hi,
conf are as follows:
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--
Alter configuration below as needed.
-->
<beanclass="org.apache.ignite.configuration.IgniteConfiguration">
<propertyname="peerClassLoadingEnabled"value="true"/>
<!-- Enabling Apache Ignite Persistent Store. -->
<propertyname="dataStorageConfiguration">
<beanclass="org.apache.ignite.configuration.DataStorageConfiguration">
<propertyname="defaultDataRegionConfiguration">
<beanclass="org.apache.ignite.configuration.DataRegionConfiguration">
<propertyname="persistenceEnabled"value="true"/>
</bean>
</property>
</bean>
</property>
<propertyname="clientMode"value="true"/>
<propertyname="discoverySpi">
<beanclass="org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi">
<propertyname="zkConnectionString"value="172.20.20.149:2181,172.20.20.150:2181,172.20.20.151:2181"/>
<propertyname="sessionTimeout"value="30000"/>
<propertyname="zkRootPath"value="/apacheIgnite"/>
<propertyname="joinTimeout"value="10000"/>
</bean>
</property>
</bean>
</beans>
Kotlin code are follows:
fun main(args: Array<String>) {
if (args.isNullOrEmpty()) {
println("pls set accountSize orderSize threadSize")
return
}
val executor = Executors.newFixedThreadPool(args[2].toInt())
val ignite = Ignition.start("zk.xml")
ignite.cluster().active(true)
val accountCfg = CacheConfiguration<Long, Account>(CACHE_ACCOUNT)
accountCfg.cacheMode = CacheMode.REPLICATED
accountCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
accountCfg.setIndexedTypes(Long::class.java, Account::class.java)
val accounts = ignite.getOrCreateCache(accountCfg)
val orderCfg = CacheConfiguration<Long, Order>(CACHE_ORDER)
orderCfg.cacheMode = CacheMode.REPLICATED
orderCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
orderCfg.setIndexedTypes(Long::class.java, Order::class.java)
val orders = ignite.getOrCreateCache(orderCfg)
val positionCfg = CacheConfiguration<Long, Position>(CACHE_POSITION)
positionCfg.cacheMode = CacheMode.REPLICATED
positionCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
positionCfg.setIndexedTypes(Long::class.java, Position::class.java)
val positions = ignite.getOrCreateCache(positionCfg)
val tradeResultCfg = CacheConfiguration<Long,
MatchTradeResult>(CACHE_MATCH_TRADE_RESULT)
tradeResultCfg.cacheMode = CacheMode.REPLICATED
tradeResultCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
tradeResultCfg.setIndexedTypes(Long::class.java,
MatchTradeResult::class.java)
val tradeResults = ignite.getOrCreateCache(tradeResultCfg)
val cancelResultCfg = CacheConfiguration<Long,
MatchCancelResult>(CACHE_MATCH_CANCEL_RESULT)
cancelResultCfg.cacheMode = CacheMode.REPLICATED
cancelResultCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
cancelResultCfg.setIndexedTypes(Long::class.java,
MatchCancelResult::class.java)
val cancelResults = ignite.getOrCreateCache(cancelResultCfg)
// clean
orders.clear()
tradeResults.clear()
positions.clear()
accounts.clear()
cancelResults.clear()
// init Account
val accountSize = args[0].toInt()
val orderSize = args[1].toInt()
val accountDownLatch = CountDownLatch(accountSize)
repeat(accountSize) {
val account = AccountHelper.genAccount()
accounts.put(account.id, account)
accountDownLatch.countDown()
}
accountDownLatch.await()
//init order
val orderDownLatch = CountDownLatch(orderSize)
repeat(orderSize) {
executor.submit {
val order = OrderHelper.genOrder(accountSize)
val result = TradeResultHelper.genResult(order)
orders.put(order.id, order)
tradeResults.put(result.id, result)
orderDownLatch.countDown()
}
}
orderDownLatch.await()
// clear
val traded = tradeResults.query(
SqlQuery<Long, MatchTradeResult>(MatchTradeResult::class.java,
" status = ?").setArgs(CLEAR_STATUS_INIT)
).all
for (a in TransactionConcurrency.values()) {
for (b in TransactionIsolation.values()) {
val countDownLatch = CountDownLatch(traded.size)
val begin = System.currentTimeMillis()
for (item in traded) {
executor.submit {
val result = item.value
var done = false
while (!done) {
try {
ignite.transactions().txStart(a, b).use {
result.status = CLEAR_STATUS_INIT
val accountBuy =
accounts.get(result.buyUserId)
val accountSell =
accounts.get(result.sellUserId)
val positionBuy =
PositionHelp.genPosition(result.buyUserId, result.contractId)
val positionSell =
PositionHelp.genPosition(result.sellUserId, result.contractId)
accountBuy?.let {
accountBuy.balance -= result.amount
accounts.put(accountBuy.id, accountBuy)
}
accountSell?.let {
accountSell.balance += result.amount
accounts.put(accountSell.id,
accountSell)
}
tradeResults.put(result.id, result)
positions.put(positionBuy.id, positionBuy)
positions.put(positionSell.id,
positionSell)
it.commit()
}
done = true
} catch (e: Exception) {
// println(e.message)
}
}
countDownLatch.countDown()
}
}
countDownLatch.await()
val batch = traded.size
val duration = System.currentTimeMillis() - begin
println("concurrency=$a isolation=$b size=$batch used
${duration}ms qps:${batch.toDouble() / duration.toDouble() * 1000}")
TimeUnit.SECONDS.sleep(2)
}
}
}
在 2019/1/22 下午7:17, Ilya Kasnacheev 写道:
Hello!
Do you have a reproducer project?
Regards,
--
Ilya Kasnacheev
вт, 22 янв. 2019 г. в 13:31, 李玉珏@163 <[email protected]
<mailto:[email protected]>>:
Hi,
If so, is this a bug?