Yes, it's thread-safe and you can implement multi-threaded cache store Evgeni
2018-08-01 18:15 GMT+03:00 Prasad Bhalerao <[email protected]>: > Hi, > > I am loading caches from oracle tables. For that I am > overriding CacheStoreAdapter.loadCache method. > This method accepts parameter IgniteBiInClosure<K, V>. > > Is the instance of IgniteBiInClosure instance thread safe? > > I am doing partition aware loading and I have written code as shown below. > Basically I am trying to load the data parallel using executor service. > This data loading tasks are using the same ignitebinclosure instance to > push data to cache. > > Can some please confirm if it is ok to use IgniteBiInClosure instance in > multithreaded fashion? > > /** > * Default empty implementation. This method needs to be overridden only if > {@link > * IgniteCache#loadCache(IgniteBiPredicate, Object...)} method is explicitly > called. > * > * @param clo {@inheritDoc} > * @param args {@inheritDoc} > */ > @Override > public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) { > > LOGGER.info("{} loading started...", this.cacheName); > StopWatch loaderWatch = new StopWatch("Loader"); > loaderWatch.start(); > > DataSource dataSource = (DataSource) springCtx.getBean(DataSource.class); > ExecutorService executorService = getExecutorService(); > > loadPartitions(clo, getPrimaryParitionIdsLocalToNode(), dataSource, > executorService); > loadPartitions(clo, getBackupParitionIdsLocalToNode(), dataSource, > executorService); > > shutDownExecutorService(executorService); > > loaderWatch.stop(); > LOGGER.info("{} loading completed in {} ", this.cacheName, > loaderWatch.getTotalTimeMillis()); > } > > private void loadPartitions(IgniteBiInClosure<K, V> clo, List<Integer> > partitionIds, DataSource dataSource, > > ExecutorService executorService) { > > StopWatch loaderWatch = new StopWatch(); > loaderWatch.start(); > > List<Future> futures = new ArrayList<>(); > > for (List<Integer> partitionIdSeg : Lists.partition(partitionIds, > batchsize)) { > Future<Boolean> futureTask = executorService > .submit(new DefaultDataLoaderTask(dataSource, rowMapper, > loadAllDataSql, clo, partitionIdSeg)); > futures.add(futureTask); > } > > //Wait for tasks to complete > for (Future<Boolean> future : futures) { > try { > future.get(); > } catch (InterruptedException | ExecutionException e) { > throw new CacheLoaderException(e.getMessage(), e); > } > } > LOGGER.info("{} partitions loaded in {} ", this.cacheName, > loaderWatch.getTotalTimeMillis()); > } > > > public class DefaultDataLoaderTask implements Callable<Boolean> { > > private static final Logger LOGGER = > LoggerFactory.getLogger(DefaultDataLoaderTask.class); > private final DataSource dataSource; > private final String loadAllDataSql; > private final RowMapper<? extends Data> rowMapper; > private final List<Integer> partitionIds; > private final IgniteBiInClosure clo; > > public DefaultDataLoaderTask(DataSource dataSource, RowMapper<? extends > Data> rowMapper, String loadAllDataSql, > IgniteBiInClosure clo, List<Integer> partitionIds) { > this.dataSource = dataSource; > this.loadAllDataSql = loadAllDataSql; > this.rowMapper = rowMapper; > this.partitionIds = partitionIds; > this.clo = clo; > } > > @Override > public Boolean call() throws Exception { > > String selectSql = loadAllDataSql + getInClauseForPreparedStmt(); > StopWatch taskWatch = new StopWatch("TaskWatch"); > taskWatch.start(); > try (Connection connection = dataSource.getConnection(); > PreparedStatement preparedStatement = > connection.prepareStatement(selectSql)) { > > int idx = 1; > for (Integer partitionId : partitionIds) { > preparedStatement.setInt(idx, partitionId); > idx++; > } > > try (ResultSet resultSet = preparedStatement.executeQuery()) { > while (resultSet.next()) { > final Data data = rowMapper.mapRow(resultSet, 0); > final DataKey key = data.getKey(); > clo.apply(key, data); > } > } > } > taskWatch.stop(); > LOGGER.info("DefaultDataLoaderTask completed in {} ms", > taskWatch.getTotalTimeMillis()); > > return Boolean.TRUE; > } > > > > > Thanks, > Prasad >
