Yes, it's thread-safe and you can implement multi-threaded cache store

Evgeni

2018-08-01 18:15 GMT+03:00 Prasad Bhalerao <[email protected]>:

> Hi,
>
> I am loading caches from oracle tables. For that I am
> overriding CacheStoreAdapter.loadCache method.
> This method accepts parameter IgniteBiInClosure<K, V>.
>
> Is the instance of  IgniteBiInClosure instance thread safe?
>
> I am doing partition aware loading and I have written code as shown below.
> Basically I am trying to load the data parallel using executor service.
> This data loading tasks are using the same ignitebinclosure instance to
> push data to cache.
>
> Can some please confirm if it is ok to use IgniteBiInClosure instance in
> multithreaded fashion?
>
> /**
>  * Default empty implementation. This method needs to be overridden only if 
> {@link
>  * IgniteCache#loadCache(IgniteBiPredicate, Object...)} method is explicitly 
> called.
>  *
>  * @param clo {@inheritDoc}
>  * @param args {@inheritDoc}
>  */
> @Override
> public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) {
>
>   LOGGER.info("{} loading started...", this.cacheName);
>   StopWatch loaderWatch = new StopWatch("Loader");
>   loaderWatch.start();
>
>   DataSource dataSource = (DataSource) springCtx.getBean(DataSource.class);
>   ExecutorService executorService = getExecutorService();
>
>   loadPartitions(clo, getPrimaryParitionIdsLocalToNode(), dataSource, 
> executorService);
>   loadPartitions(clo, getBackupParitionIdsLocalToNode(), dataSource, 
> executorService);
>
>   shutDownExecutorService(executorService);
>
>   loaderWatch.stop();
>   LOGGER.info("{} loading completed in {} ", this.cacheName, 
> loaderWatch.getTotalTimeMillis());
> }
>
> private void loadPartitions(IgniteBiInClosure<K, V> clo, List<Integer> 
> partitionIds, DataSource dataSource,
>
>     ExecutorService executorService) {
>
>   StopWatch loaderWatch = new StopWatch();
>   loaderWatch.start();
>
>   List<Future> futures = new ArrayList<>();
>
>   for (List<Integer> partitionIdSeg : Lists.partition(partitionIds, 
> batchsize)) {
>     Future<Boolean> futureTask = executorService
>         .submit(new DefaultDataLoaderTask(dataSource, rowMapper, 
> loadAllDataSql, clo, partitionIdSeg));
>     futures.add(futureTask);
>   }
>
>   //Wait for tasks to complete
>   for (Future<Boolean> future : futures) {
>     try {
>       future.get();
>     } catch (InterruptedException | ExecutionException e) {
>       throw new CacheLoaderException(e.getMessage(), e);
>     }
>   }
>   LOGGER.info("{} partitions loaded in {} ", this.cacheName, 
> loaderWatch.getTotalTimeMillis());
> }
>
>
> public class DefaultDataLoaderTask implements Callable<Boolean> {
>
>   private static final Logger LOGGER = 
> LoggerFactory.getLogger(DefaultDataLoaderTask.class);
>   private final DataSource dataSource;
>   private final String loadAllDataSql;
>   private final RowMapper<? extends Data> rowMapper;
>   private final List<Integer> partitionIds;
>   private final IgniteBiInClosure clo;
>
>   public DefaultDataLoaderTask(DataSource dataSource, RowMapper<? extends 
> Data> rowMapper, String loadAllDataSql,
>       IgniteBiInClosure clo, List<Integer> partitionIds) {
>     this.dataSource = dataSource;
>     this.loadAllDataSql = loadAllDataSql;
>     this.rowMapper = rowMapper;
>     this.partitionIds = partitionIds;
>     this.clo = clo;
>   }
>
>   @Override
>   public Boolean call() throws Exception {
>
>     String selectSql = loadAllDataSql + getInClauseForPreparedStmt();
>     StopWatch taskWatch = new StopWatch("TaskWatch");
>     taskWatch.start();
>     try (Connection connection = dataSource.getConnection();
>         PreparedStatement preparedStatement = 
> connection.prepareStatement(selectSql)) {
>
>       int idx = 1;
>       for (Integer partitionId : partitionIds) {
>         preparedStatement.setInt(idx, partitionId);
>         idx++;
>       }
>
>       try (ResultSet resultSet = preparedStatement.executeQuery()) {
>         while (resultSet.next()) {
>           final Data data = rowMapper.mapRow(resultSet, 0);
>           final DataKey key = data.getKey();
>           clo.apply(key, data);
>         }
>       }
>     }
>     taskWatch.stop();
>     LOGGER.info("DefaultDataLoaderTask completed in {} ms", 
> taskWatch.getTotalTimeMillis());
>
>     return Boolean.TRUE;
>   }
>
>
>
>
> Thanks,
> Prasad
>

Reply via email to