package myignite.loading.test.cache.store;
import static myignite.loading.test.common.CommonUtils.CONFIG_DIR;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_PWD;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_URL;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_USR;
import static myignite.loading.test.common.CommonUtils.RWA_SQL_FETCH_SIZE;
import static
myignite.loading.test.common.CommonUtils.SQL_FETCH_SIZE_DEFAULT;
import static
myignite.loading.test.common.CommonUtils.TREAS_LIQUIDITY_CLASS_UNDEFINED;
import static myignite.loading.test.common.CommonUtils.REPLENISH;
import static myignite.loading.test.common.CommonUtils.EXISTING;
import static myignite.loading.test.common.CommonUtils.stopWatchEnd;
import static myignite.loading.test.common.CommonUtils.stopWatchStart;
import org.jooq.lambda.tuple.Tuple2;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.integration.CacheLoaderException;
import org.apache.ignite.cache.store.CacheLoadOnlyStoreAdapter;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;
import myignite.loading.test.domain.MyDTO;
public class ExistingOrReplenishCacheLoadOnlyStore3
extends CacheLoadOnlyStoreAdapter<Long, ArrayList<MyDTO>,
Tuple2<Long,ArrayList<MyDTO>>> {
private final static Logger logger =
LoggerFactory.getLogger(ExistingOrReplenishCacheLoadOnlyStore3.class);
private static int SQL_FETCH_SIZE = SQL_FETCH_SIZE_DEFAULT;
private static String dataSourceUrl;
private static String dbUser;
private static String dbPwd;
private SingleConnectionDataSource DATA_SRC;
static {
String configDir = System.getProperty(CONFIG_DIR);
Assert.notNull(configDir, "config.dir should be passed as JVM
arguments...");
StringBuffer filePath = new StringBuffer(configDir);
filePath.append(File.separatorChar).append("rwa-batch.properties");
Properties props = new Properties();
// InputStream inputStream =
//
ExistingCacheStore.class.getClassLoader().getResourceAsStream(configDir);
try {
InputStream inputStream = new
FileInputStream(filePath.toString());
Assert.notNull(inputStream,
"FileNotFoundException - property file
'" + filePath + "' not found in
file system");
props.load(inputStream);
} catch (IOException e) {
e.printStackTrace();
}
dataSourceUrl = props.getProperty(DATA_SRC_URL);
System.out.println(">>>dataSourceUrl::" + dataSourceUrl);
Assert.notNull(dataSourceUrl, "'rwa.jdbc.url' should be
provided in
rwa-batch.properties...");
dbUser = props.getProperty(DATA_SRC_USR);
Assert.notNull(dbUser, "'rwa.jdbc.usr' should be provided in
rwa-batch.properties...");
dbPwd = System.getProperty(DATA_SRC_PWD);
Assert.notNull(dbPwd, "'rwa.jdbc.pwd' should be provided in
rwa-batch.properties...");
String fetchSize = props.getProperty(RWA_SQL_FETCH_SIZE);
if (fetchSize != null) {
SQL_FETCH_SIZE = Integer.valueOf(fetchSize);
}
}
private JdbcTemplate jdbcTemplate;
public ExistingOrReplenishCacheLoadOnlyStore3() {
super();
DATA_SRC = new SingleConnectionDataSource();
DATA_SRC.setDriverClassName("oracle.jdbc.driver.OracleDriver");
DATA_SRC.setUrl(dataSourceUrl);
DATA_SRC.setUsername(dbUser);
DATA_SRC.setPassword(dbPwd);
jdbcTemplate = new JdbcTemplate(DATA_SRC);
}
@Override
protected Iterator<Tuple2<Long,ArrayList<MyDTO>>>
inputIterator(Object... args) throws CacheLoaderException {
if (args == null || args.length < 6)
throw new CacheLoaderException(
"Expected asOf, scenId,
HierarchyServiceProxy and replenish parameters
are not fully provided...");
try {
final Date asOf = (Date) args[0];
final String datasetId = (String) args[1];
final Integer scenId = (Integer) args[2];
final String sql = (String) args[3];
final Boolean replenishFlag = (Boolean) args[4];
Integer startSize =(Integer) args[5];
logger.debug("AS_OF::{} DATASET_ID::{} SCEN_ID::{}
REP_FLAG::{}
START_SIZE::{}", asOf, datasetId, scenId,
replenishFlag, startSize);
logger.debug("load{}Cache::SQL::{}", (replenishFlag ?
"Replenish" :
"Existing"), sql);
ArrayList<Tuple2<Long,ArrayList<MyDTO>>>
extOrRepList = null;
// Iterator<Entry<Integer, ArrayList<MyDTO>>>
iterator = null;
// Iterator<ArrayList<MyDTO>> iterator = null;
Iterator<Tuple2<Long,ArrayList<MyDTO>>> iterator
= null;
// ResultSetExtractor<LinkedHashMap<Integer,
ArrayList<MyDTO>>>
extOrRepMapResultSetExtractor = new
ResultSetExtractor<LinkedHashMap<Integer, ArrayList<MyDTO>>>() {
ResultSetExtractor<ArrayList<Tuple2<Long,ArrayList<MyDTO>>>>
extOrRepMapResultSetExtractor = new
ResultSetExtractor<ArrayList<Tuple2<Long,ArrayList<MyDTO>>>>() {
@Override
// public LinkedHashMap<Integer,
ArrayList<MyDTO>>
extractData(ResultSet rs)
public
ArrayList<Tuple2<Long,ArrayList<MyDTO>>>
extractData(ResultSet rs)
throws SQLException,
DataAccessException {
// LinkedHashMap<Integer,
ArrayList<MyDTO>> extOrRepMap = new
LinkedHashMap<Integer, ArrayList<MyDTO>>(
// gockeyCnt, 1.0f);
// ArrayList<ArrayList<MyDTO>>
extOrRepList = new
ArrayList<ArrayList<MyDTO>>(
// gockeyCnt);
ArrayList<Tuple2<Long,ArrayList<MyDTO>>> extOrRepList = new
ArrayList<Tuple2<Long,ArrayList<MyDTO>>>(startSize);
String prevGoc = null, prevAcct = null,
prevSac = null, prevCcy = null;
Integer prevFpId = null;
ArrayList<MyDTO> currDTOList = null,
prevDTOList = null;
MyDTO dto = null, prevDto = null;
// final AtomicInteger entryCnt = new
AtomicInteger(0);
while (rs.next()) {
int i = 1;
dto = new MyDTO();
dto.setAsOf(asOf);
dto.setDatasetId(Integer.valueOf(datasetId));
dto.setScnId(scenId);
dto.setGoc(rs.getString(i++));
dto.setAcct(rs.getString(i++));
dto.setSumAffilCode(rs.getString(i++));
dto.setCcyCode(rs.getString(i++));
dto.setFrcstProdId(rs.getInt(i++));
dto.setMngSeg(rs.getString(i++));
dto.setMngGeo(rs.getString(i++));
dto.setFrsBu(rs.getString(i++));
if (replenishFlag) {
dto.setReplenishFlag(REPLENISH);
} else {
dto.setRwaExposureType(rs.getString(i++));
dto.setRiskAssetClass(rs.getString(i++));
dto.setRiskSubAssetClass(rs.getString(i++));
String treasLiqClass =
rs.getString(i++);
dto.setTreasLiqClass((treasLiqClass == null ?
TREAS_LIQUIDITY_CLASS_UNDEFINED :treasLiqClass));
dto.setCounterpartyRating(rs.getString(i++));
dto.setClearedStatus(rs.getString(i++));
dto.setMaturityBand(rs.getString(i++));
dto.setDerivativeType(rs.getString(i++));
dto.setReplenishFlag(EXISTING);
}
dto.setStartDate(rs.getDate(i++));
dto.setMaturityDate(rs.getDate(i++));
dto.setAmount(rs.getDouble(i++));
if(!replenishFlag) {
dto.setEtlsource(rs.getString(i++));
}else {
dto.setInvestmentId(rs.getString(i++));
}
if
(dto.getGoc().equals(prevGoc) && dto.getAcct().equals(prevAcct)
&&
dto.getSumAffilCode().equals(prevSac) &&
dto.getCcyCode().equals(prevCcy)
&&
dto.getFrcstProdId().equals(prevFpId)) {
prevDTOList.add(prevDto);
} else {
if (prevDto != null) {
prevDTOList.add(prevDto);
//
extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList);
extOrRepList.add(new Tuple2<Long,
ArrayList<MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
}
currDTOList = new
ArrayList<MyDTO>();
}
prevDto = dto;
prevDTOList = currDTOList;
prevGoc = dto.getGoc();
prevAcct = dto.getAcct();
prevSac = dto.getSumAffilCode();
prevCcy = dto.getCcyCode();
prevFpId = dto.getFrcstProdId();
}
if (prevDto != null) {
prevDTOList.add(prevDto);
//
extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList);
extOrRepList.add(new
Tuple2<Long,
ArrayList<MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
}
// return extOrRepMap;
return extOrRepList;
}
};
jdbcTemplate.setFetchSize(SQL_FETCH_SIZE);
StopWatch sw = new StopWatch();
stopWatchStart(sw, "populatingDataMap");
logger.debug("BEFORE populatingDataMap_STARTS!!!!");
// extOrRepMap = jdbcTemplate.query(sql,
extOrRepMapResultSetExtractor);
extOrRepList = jdbcTemplate.query(sql,
extOrRepMapResultSetExtractor);
logger.debug("BEFORE populatingDataMap_ENDS!!!!");
stopWatchEnd(sw);
if (extOrRepList != null) {
// iterator = extOrRepMap.entrySet().iterator();
/*
* RECORDS COUNT PRINTED CORRECTLY HERE BEFORE
PASSING TO IGNITE
*/
logger.debug("+++++++ GOC_KEY
COUNT::{}",extOrRepList.size());
iterator = extOrRepList.iterator();
}
return iterator;
} finally {
DATA_SRC.destroy();
}
}
@Override
protected IgniteBiTuple<Long, ArrayList<MyDTO>>
parse(Tuple2<Long,ArrayList<MyDTO>> rec, Object... args) {
return new T2<>(rec.v1(), rec.v2());
}
}
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/Missing-records-Ignite-cache-size-grows-tp10809p10871.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.