Hi, I've asked for closure that passed to CacheStore.loadCache(closure, ..) method.
On Fri, Feb 24, 2017 at 9:31 PM, diopek <[email protected]> wrote: > package myignite.loading.test.cache.store; > > import static myignite.loading.test.common.CommonUtils.CONFIG_DIR; > import static myignite.loading.test.common.CommonUtils.DATA_SRC_PWD; > import static myignite.loading.test.common.CommonUtils.DATA_SRC_URL; > import static myignite.loading.test.common.CommonUtils.DATA_SRC_USR; > import static myignite.loading.test.common.CommonUtils.RWA_SQL_FETCH_SIZE; > import static > myignite.loading.test.common.CommonUtils.SQL_FETCH_SIZE_DEFAULT; > import static > myignite.loading.test.common.CommonUtils.TREAS_LIQUIDITY_CLASS_UNDEFINED; > import static myignite.loading.test.common.CommonUtils.REPLENISH; > import static myignite.loading.test.common.CommonUtils.EXISTING; > import static myignite.loading.test.common.CommonUtils.stopWatchEnd; > import static myignite.loading.test.common.CommonUtils.stopWatchStart; > import org.jooq.lambda.tuple.Tuple2; > > import java.io.File; > import java.io.FileInputStream; > import java.io.IOException; > import java.io.InputStream; > import java.sql.ResultSet; > import java.sql.SQLException; > import java.util.ArrayList; > import java.util.Date; > import java.util.Iterator; > import java.util.Properties; > import java.util.concurrent.atomic.AtomicLong; > > import javax.cache.integration.CacheLoaderException; > > import org.apache.ignite.cache.store.CacheLoadOnlyStoreAdapter; > import org.apache.ignite.internal.util.typedef.T2; > import org.apache.ignite.lang.IgniteBiTuple; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import org.springframework.dao.DataAccessException; > import org.springframework.jdbc.core.JdbcTemplate; > import org.springframework.jdbc.core.ResultSetExtractor; > import org.springframework.jdbc.datasource.SingleConnectionDataSource; > import org.springframework.util.Assert; > import org.springframework.util.StopWatch; > > import myignite.loading.test.domain.MyDTO; > > public class ExistingOrReplenishCacheLoadOnlyStore3 > extends CacheLoadOnlyStoreAdapter<Long, > ArrayList<MyDTO>, > Tuple2<Long,ArrayList<MyDTO>>> { > > private final static Logger logger = > LoggerFactory.getLogger(ExistingOrReplenishCacheLoadOnlyStore3.class); > > private static int SQL_FETCH_SIZE = SQL_FETCH_SIZE_DEFAULT; > private static String dataSourceUrl; > private static String dbUser; > private static String dbPwd; > > private SingleConnectionDataSource DATA_SRC; > > static { > String configDir = System.getProperty(CONFIG_DIR); > Assert.notNull(configDir, "config.dir should be passed as > JVM > arguments..."); > StringBuffer filePath = new StringBuffer(configDir); > filePath.append(File.separatorChar).append("rwa- > batch.properties"); > Properties props = new Properties(); > // InputStream inputStream = > // > ExistingCacheStore.class.getClassLoader().getResourceAsStream(configDir); > try { > InputStream inputStream = new > FileInputStream(filePath.toString()); > Assert.notNull(inputStream, > "FileNotFoundException - property > file '" + filePath + "' not found in > file system"); > props.load(inputStream); > } catch (IOException e) { > e.printStackTrace(); > } > dataSourceUrl = props.getProperty(DATA_SRC_URL); > System.out.println(">>>dataSourceUrl::" + dataSourceUrl); > Assert.notNull(dataSourceUrl, "'rwa.jdbc.url' should be > provided in > rwa-batch.properties..."); > dbUser = props.getProperty(DATA_SRC_USR); > Assert.notNull(dbUser, "'rwa.jdbc.usr' should be provided > in > rwa-batch.properties..."); > dbPwd = System.getProperty(DATA_SRC_PWD); > Assert.notNull(dbPwd, "'rwa.jdbc.pwd' should be provided in > rwa-batch.properties..."); > > String fetchSize = props.getProperty(RWA_SQL_FETCH_SIZE); > if (fetchSize != null) { > SQL_FETCH_SIZE = Integer.valueOf(fetchSize); > } > } > > private JdbcTemplate jdbcTemplate; > > public ExistingOrReplenishCacheLoadOnlyStore3() { > super(); > DATA_SRC = new SingleConnectionDataSource(); > DATA_SRC.setDriverClassName("oracle.jdbc.driver. > OracleDriver"); > DATA_SRC.setUrl(dataSourceUrl); > DATA_SRC.setUsername(dbUser); > DATA_SRC.setPassword(dbPwd); > jdbcTemplate = new JdbcTemplate(DATA_SRC); > } > > @Override > protected Iterator<Tuple2<Long,ArrayList<MyDTO>>> > inputIterator(Object... args) throws CacheLoaderException { > if (args == null || args.length < 6) > throw new CacheLoaderException( > "Expected asOf, scenId, > HierarchyServiceProxy and replenish parameters > are not fully provided..."); > try { > final Date asOf = (Date) args[0]; > final String datasetId = (String) args[1]; > final Integer scenId = (Integer) args[2]; > final String sql = (String) args[3]; > final Boolean replenishFlag = (Boolean) args[4]; > Integer startSize =(Integer) args[5]; > logger.debug("AS_OF::{} DATASET_ID::{} SCEN_ID::{} > REP_FLAG::{} > START_SIZE::{}", asOf, datasetId, scenId, > replenishFlag, startSize); > > logger.debug("load{}Cache::SQL::{}", > (replenishFlag ? "Replenish" : > "Existing"), sql); > ArrayList<Tuple2<Long,ArrayList<MyDTO>>> > extOrRepList = null; > // Iterator<Entry<Integer, ArrayList<MyDTO>>> > iterator = null; > // Iterator<ArrayList<MyDTO>> iterator = null; > Iterator<Tuple2<Long,ArrayList<MyDTO>>> > iterator = null; > > // ResultSetExtractor<LinkedHashMap<Integer, > ArrayList<MyDTO>>> > extOrRepMapResultSetExtractor = new > ResultSetExtractor<LinkedHashMap<Integer, ArrayList<MyDTO>>>() { > ResultSetExtractor<ArrayList& > lt;Tuple2<Long,ArrayList<MyDTO>>>> > extOrRepMapResultSetExtractor = new > ResultSetExtractor<ArrayList<Tuple2<Long,ArrayList<MyDTO>>>>() { > @Override > // public LinkedHashMap<Integer, > ArrayList<MyDTO>> > extractData(ResultSet rs) > public ArrayList<Tuple2<Long, > ArrayList<MyDTO>>> > extractData(ResultSet rs) > throws SQLException, > DataAccessException { > > // LinkedHashMap<Integer, > ArrayList<MyDTO>> extOrRepMap = new > LinkedHashMap<Integer, ArrayList<MyDTO>>( > // gockeyCnt, 1.0f); > // ArrayList<ArrayList<MyDTO>> > extOrRepList = new > ArrayList<ArrayList<MyDTO>>( > // gockeyCnt); > > ArrayList<Tuple2<Long,ArrayList<MyDTO>>> > extOrRepList = new > ArrayList<Tuple2<Long,ArrayList<MyDTO>>>(startSize); > String prevGoc = null, prevAcct = > null, prevSac = null, prevCcy = null; > Integer prevFpId = null; > ArrayList<MyDTO> currDTOList = > null, prevDTOList = null; > MyDTO dto = null, prevDto = null; > // final AtomicInteger entryCnt = new > AtomicInteger(0); > while (rs.next()) { > int i = 1; > dto = new MyDTO(); > dto.setAsOf(asOf); > dto.setDatasetId(Integer. > valueOf(datasetId)); > dto.setScnId(scenId); > > > dto.setGoc(rs.getString(i++)); > > dto.setAcct(rs.getString(i++)); > dto.setSumAffilCode(rs. > getString(i++)); > > dto.setCcyCode(rs.getString(i++)); > > dto.setFrcstProdId(rs.getInt(i++)); > > dto.setMngSeg(rs.getString(i++)); > > dto.setMngGeo(rs.getString(i++)); > > dto.setFrsBu(rs.getString(i++)); > if (replenishFlag) { > > dto.setReplenishFlag(REPLENISH); > } else { > > dto.setRwaExposureType(rs.getString(i++)); > > dto.setRiskAssetClass(rs.getString(i++)); > > dto.setRiskSubAssetClass(rs.getString(i++)); > String > treasLiqClass = rs.getString(i++); > > dto.setTreasLiqClass((treasLiqClass == null ? > TREAS_LIQUIDITY_CLASS_UNDEFINED :treasLiqClass)); > > dto.setCounterpartyRating(rs.getString(i++)); > > dto.setClearedStatus(rs.getString(i++)); > > dto.setMaturityBand(rs.getString(i++)); > > dto.setDerivativeType(rs.getString(i++)); > > dto.setReplenishFlag(EXISTING); > } > > dto.setStartDate(rs.getDate(i++)); > dto.setMaturityDate(rs. > getDate(i++)); > > dto.setAmount(rs.getDouble(i++)); > if(!replenishFlag) { > > dto.setEtlsource(rs.getString(i++)); > }else { > > dto.setInvestmentId(rs.getString(i++)); > } > if > (dto.getGoc().equals(prevGoc) && dto.getAcct().equals(prevAcct) > && > dto.getSumAffilCode().equals(prevSac) && > dto.getCcyCode().equals(prevCcy) > && > dto.getFrcstProdId().equals(prevFpId)) { > > prevDTOList.add(prevDto); > } else { > if (prevDto != > null) { > > prevDTOList.add(prevDto); > // > extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList); > > extOrRepList.add(new Tuple2<Long, > ArrayList<MyDTO>>(entryCnt.incrementAndGet(),prevDTOList)); > } > currDTOList = new > ArrayList<MyDTO>(); > } > prevDto = dto; > prevDTOList = currDTOList; > prevGoc = dto.getGoc(); > prevAcct = dto.getAcct(); > prevSac = > dto.getSumAffilCode(); > prevCcy = dto.getCcyCode(); > prevFpId = > dto.getFrcstProdId(); > } > if (prevDto != null) { > prevDTOList.add(prevDto); > // > extOrRepMap.put(entryCnt.incrementAndGet(), > prevDTOList); > extOrRepList.add(new > Tuple2<Long, > ArrayList<MyDTO>>(entryCnt.incrementAndGet(),prevDTOList)); > } > // return extOrRepMap; > return extOrRepList; > } > > }; > > jdbcTemplate.setFetchSize(SQL_FETCH_SIZE); > StopWatch sw = new StopWatch(); > stopWatchStart(sw, "populatingDataMap"); > logger.debug("BEFORE populatingDataMap_STARTS!!!!") > ; > // extOrRepMap = jdbcTemplate.query(sql, > extOrRepMapResultSetExtractor); > extOrRepList = jdbcTemplate.query(sql, > extOrRepMapResultSetExtractor); > logger.debug("BEFORE populatingDataMap_ENDS!!!!"); > stopWatchEnd(sw); > > if (extOrRepList != null) { > // iterator = extOrRepMap.entrySet(). > iterator(); > /* > * RECORDS COUNT PRINTED CORRECTLY HERE > BEFORE PASSING TO IGNITE > */ > logger.debug("+++++++ GOC_KEY > COUNT::{}",extOrRepList.size()); > iterator = extOrRepList.iterator(); > } > return iterator; > } finally { > DATA_SRC.destroy(); > } > } > > @Override > protected IgniteBiTuple<Long, ArrayList<MyDTO>> > parse(Tuple2<Long,ArrayList<MyDTO>> rec, Object... args) { > return new T2<>(rec.v1(), rec.v2()); > } > > } > > > > > -- > View this message in context: http://apache-ignite-users. > 70518.x6.nabble.com/Missing-records-Ignite-cache-size- > grows-tp10809p10871.html > Sent from the Apache Ignite Users mailing list archive at Nabble.com. > -- Best regards, Andrey V. Mashenkov
