Hi,

I am loading caches from oracle tables. For that I am
overriding CacheStoreAdapter.loadCache method.
This method accepts parameter IgniteBiInClosure<K, V>.

Is the instance of  IgniteBiInClosure instance thread safe?

I am doing partition aware loading and I have written code as shown below.
Basically I am trying to load the data parallel using executor service.
This data loading tasks are using the same ignitebinclosure instance to
push data to cache.

Can some please confirm if it is ok to use IgniteBiInClosure instance in
multithreaded fashion?

/**
 * Default empty implementation. This method needs to be overridden
only if {@link
 * IgniteCache#loadCache(IgniteBiPredicate, Object...)} method is
explicitly called.
 *
 * @param clo {@inheritDoc}
 * @param args {@inheritDoc}
 */
@Override
public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) {

  LOGGER.info("{} loading started...", this.cacheName);
  StopWatch loaderWatch = new StopWatch("Loader");
  loaderWatch.start();

  DataSource dataSource = (DataSource) springCtx.getBean(DataSource.class);
  ExecutorService executorService = getExecutorService();

  loadPartitions(clo, getPrimaryParitionIdsLocalToNode(), dataSource,
executorService);
  loadPartitions(clo, getBackupParitionIdsLocalToNode(), dataSource,
executorService);

  shutDownExecutorService(executorService);

  loaderWatch.stop();
  LOGGER.info("{} loading completed in {} ", this.cacheName,
loaderWatch.getTotalTimeMillis());
}

private void loadPartitions(IgniteBiInClosure<K, V> clo, List<Integer>
partitionIds, DataSource dataSource,

    ExecutorService executorService) {

  StopWatch loaderWatch = new StopWatch();
  loaderWatch.start();

  List<Future> futures = new ArrayList<>();

  for (List<Integer> partitionIdSeg : Lists.partition(partitionIds,
batchsize)) {
    Future<Boolean> futureTask = executorService
        .submit(new DefaultDataLoaderTask(dataSource, rowMapper,
loadAllDataSql, clo, partitionIdSeg));
    futures.add(futureTask);
  }

  //Wait for tasks to complete
  for (Future<Boolean> future : futures) {
    try {
      future.get();
    } catch (InterruptedException | ExecutionException e) {
      throw new CacheLoaderException(e.getMessage(), e);
    }
  }
  LOGGER.info("{} partitions loaded in {} ", this.cacheName,
loaderWatch.getTotalTimeMillis());
}


public class DefaultDataLoaderTask implements Callable<Boolean> {

  private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultDataLoaderTask.class);
  private final DataSource dataSource;
  private final String loadAllDataSql;
  private final RowMapper<? extends Data> rowMapper;
  private final List<Integer> partitionIds;
  private final IgniteBiInClosure clo;

  public DefaultDataLoaderTask(DataSource dataSource, RowMapper<?
extends Data> rowMapper, String loadAllDataSql,
      IgniteBiInClosure clo, List<Integer> partitionIds) {
    this.dataSource = dataSource;
    this.loadAllDataSql = loadAllDataSql;
    this.rowMapper = rowMapper;
    this.partitionIds = partitionIds;
    this.clo = clo;
  }

  @Override
  public Boolean call() throws Exception {

    String selectSql = loadAllDataSql + getInClauseForPreparedStmt();
    StopWatch taskWatch = new StopWatch("TaskWatch");
    taskWatch.start();
    try (Connection connection = dataSource.getConnection();
        PreparedStatement preparedStatement =
connection.prepareStatement(selectSql)) {

      int idx = 1;
      for (Integer partitionId : partitionIds) {
        preparedStatement.setInt(idx, partitionId);
        idx++;
      }

      try (ResultSet resultSet = preparedStatement.executeQuery()) {
        while (resultSet.next()) {
          final Data data = rowMapper.mapRow(resultSet, 0);
          final DataKey key = data.getKey();
          clo.apply(key, data);
        }
      }
    }
    taskWatch.stop();
    LOGGER.info("DefaultDataLoaderTask completed in {} ms",
taskWatch.getTotalTimeMillis());

    return Boolean.TRUE;
  }




Thanks,
Prasad

Reply via email to