package myignite.loading.test.cache.store;

import static myignite.loading.test.common.CommonUtils.CONFIG_DIR;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_PWD;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_URL;
import static myignite.loading.test.common.CommonUtils.DATA_SRC_USR;
import static myignite.loading.test.common.CommonUtils.RWA_SQL_FETCH_SIZE;
import static
myignite.loading.test.common.CommonUtils.SQL_FETCH_SIZE_DEFAULT;
import static
myignite.loading.test.common.CommonUtils.TREAS_LIQUIDITY_CLASS_UNDEFINED;
import static myignite.loading.test.common.CommonUtils.REPLENISH;
import static myignite.loading.test.common.CommonUtils.EXISTING;
import static myignite.loading.test.common.CommonUtils.stopWatchEnd;
import static myignite.loading.test.common.CommonUtils.stopWatchStart;
import org.jooq.lambda.tuple.Tuple2;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

import javax.cache.integration.CacheLoaderException;

import org.apache.ignite.cache.store.CacheLoadOnlyStoreAdapter;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;

import myignite.loading.test.domain.MyDTO;

public class ExistingOrReplenishCacheLoadOnlyStore3
                extends CacheLoadOnlyStoreAdapter<Long, ArrayList&lt;MyDTO>,
Tuple2<Long,ArrayList&lt;MyDTO>>> {

        private final static Logger logger =
LoggerFactory.getLogger(ExistingOrReplenishCacheLoadOnlyStore3.class);

        private static int SQL_FETCH_SIZE = SQL_FETCH_SIZE_DEFAULT;
        private static String dataSourceUrl;
        private static String dbUser;
        private static String dbPwd;

        private SingleConnectionDataSource DATA_SRC;

        static {
                String configDir = System.getProperty(CONFIG_DIR);
                Assert.notNull(configDir, "config.dir should be passed as JVM
arguments...");
                StringBuffer filePath = new StringBuffer(configDir);
                
filePath.append(File.separatorChar).append("rwa-batch.properties");
                Properties props = new Properties();
                // InputStream inputStream =
                //
ExistingCacheStore.class.getClassLoader().getResourceAsStream(configDir);
                try {
                        InputStream inputStream = new 
FileInputStream(filePath.toString());
                        Assert.notNull(inputStream,
                                        "FileNotFoundException - property file 
'" + filePath + "' not found in
file system");
                        props.load(inputStream);
                } catch (IOException e) {
                        e.printStackTrace();
                }
                dataSourceUrl = props.getProperty(DATA_SRC_URL);
                System.out.println(">>>dataSourceUrl::" + dataSourceUrl);
                Assert.notNull(dataSourceUrl, "'rwa.jdbc.url' should be 
provided in
rwa-batch.properties...");
                dbUser = props.getProperty(DATA_SRC_USR);
                Assert.notNull(dbUser, "'rwa.jdbc.usr' should be provided in
rwa-batch.properties...");
                dbPwd = System.getProperty(DATA_SRC_PWD);
                Assert.notNull(dbPwd, "'rwa.jdbc.pwd' should be provided in
rwa-batch.properties...");
                
                String fetchSize = props.getProperty(RWA_SQL_FETCH_SIZE);
                if (fetchSize != null) {
                        SQL_FETCH_SIZE = Integer.valueOf(fetchSize);
                }
        }

        private JdbcTemplate jdbcTemplate;
        
        public ExistingOrReplenishCacheLoadOnlyStore3() {
                super();
                DATA_SRC = new SingleConnectionDataSource();
                DATA_SRC.setDriverClassName("oracle.jdbc.driver.OracleDriver");
                DATA_SRC.setUrl(dataSourceUrl);
                DATA_SRC.setUsername(dbUser);
                DATA_SRC.setPassword(dbPwd);
                jdbcTemplate = new JdbcTemplate(DATA_SRC);
        }

        @Override
        protected Iterator<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
inputIterator(Object... args) throws CacheLoaderException {
                if (args == null || args.length < 6)
                        throw new CacheLoaderException(
                                        "Expected asOf, scenId, 
HierarchyServiceProxy and replenish parameters
are not fully provided...");
                try {
                        final Date asOf = (Date) args[0];
                        final String datasetId = (String) args[1];
                        final Integer scenId = (Integer) args[2];
                        final String sql = (String) args[3];
                        final Boolean replenishFlag = (Boolean) args[4];
                        Integer startSize =(Integer) args[5];
                    logger.debug("AS_OF::{} DATASET_ID::{} SCEN_ID::{} 
REP_FLAG::{}
START_SIZE::{}", asOf, datasetId, scenId,
                                replenishFlag, startSize);

                        logger.debug("load{}Cache::SQL::{}", (replenishFlag ? 
"Replenish" :
"Existing"), sql);
                        ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>> 
extOrRepList = null;
//                      Iterator<Entry&lt;Integer, ArrayList&lt;MyDTO>>> 
iterator = null;
//                      Iterator<ArrayList&lt;MyDTO>> iterator = null;
                        Iterator<Tuple2&lt;Long,ArrayList&lt;MyDTO>>> iterator 
= null;

//                      ResultSetExtractor<LinkedHashMap&lt;Integer, 
ArrayList&lt;MyDTO>>>
extOrRepMapResultSetExtractor = new
ResultSetExtractor<LinkedHashMap&lt;Integer, ArrayList&lt;MyDTO>>>() {
                        
ResultSetExtractor<ArrayList&lt;Tuple2&lt;Long,ArrayList&lt;MyDTO>>>>
extOrRepMapResultSetExtractor = new
ResultSetExtractor<ArrayList&lt;Tuple2&lt;Long,ArrayList&lt;MyDTO>>>>() {
                                @Override
//                              public LinkedHashMap<Integer, 
ArrayList&lt;MyDTO>>
extractData(ResultSet rs)
                                public 
ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>
extractData(ResultSet rs)
                                                throws SQLException, 
DataAccessException {

//                                      LinkedHashMap<Integer, 
ArrayList&lt;MyDTO>> extOrRepMap = new
LinkedHashMap<Integer, ArrayList&lt;MyDTO>>(
//                                                      gockeyCnt, 1.0f);
//                                      ArrayList<ArrayList&lt;MyDTO>> 
extOrRepList = new
ArrayList<ArrayList&lt;MyDTO>>(
//                                                      gockeyCnt);
                                        
ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>> extOrRepList = new
ArrayList<Tuple2&lt;Long,ArrayList&lt;MyDTO>>>(startSize);
                                        String prevGoc = null, prevAcct = null, 
prevSac = null, prevCcy = null;
                                        Integer prevFpId = null;
                                        ArrayList<MyDTO> currDTOList = null, 
prevDTOList = null;
                                        MyDTO dto = null, prevDto = null;
//                                      final AtomicInteger entryCnt = new 
AtomicInteger(0);
                                        while (rs.next()) {
                                                int i = 1;
                                                dto = new MyDTO();
                                                dto.setAsOf(asOf);
                                                
dto.setDatasetId(Integer.valueOf(datasetId));
                                                dto.setScnId(scenId);

                                                dto.setGoc(rs.getString(i++));
                                                dto.setAcct(rs.getString(i++));
                                                
dto.setSumAffilCode(rs.getString(i++));
                                                
dto.setCcyCode(rs.getString(i++));
                                                
dto.setFrcstProdId(rs.getInt(i++));
                                                
dto.setMngSeg(rs.getString(i++));
                                                
dto.setMngGeo(rs.getString(i++));
                                                dto.setFrsBu(rs.getString(i++));
                                                if (replenishFlag) {
                                                        
dto.setReplenishFlag(REPLENISH);
                                                } else {
                                                        
dto.setRwaExposureType(rs.getString(i++));
                                                        
dto.setRiskAssetClass(rs.getString(i++));
                                                        
dto.setRiskSubAssetClass(rs.getString(i++));
                                                        String treasLiqClass = 
rs.getString(i++);
                                                        
dto.setTreasLiqClass((treasLiqClass == null ?
TREAS_LIQUIDITY_CLASS_UNDEFINED :treasLiqClass));
                                                        
dto.setCounterpartyRating(rs.getString(i++));
                                                        
dto.setClearedStatus(rs.getString(i++));
                                                        
dto.setMaturityBand(rs.getString(i++));
                                                        
dto.setDerivativeType(rs.getString(i++));
                                                        
dto.setReplenishFlag(EXISTING);
                                                }
                                                
dto.setStartDate(rs.getDate(i++));
                                                
dto.setMaturityDate(rs.getDate(i++));
                                                
dto.setAmount(rs.getDouble(i++));
                                                if(!replenishFlag) {
                                                        
dto.setEtlsource(rs.getString(i++));
                                                }else {
                                                        
dto.setInvestmentId(rs.getString(i++));
                                                }
                                                if 
(dto.getGoc().equals(prevGoc) && dto.getAcct().equals(prevAcct)
                                                                && 
dto.getSumAffilCode().equals(prevSac) &&
dto.getCcyCode().equals(prevCcy)
                                                                && 
dto.getFrcstProdId().equals(prevFpId)) {
                                                        
prevDTOList.add(prevDto);
                                                } else {
                                                        if (prevDto != null) {
                                                                
prevDTOList.add(prevDto);
//                                                              
extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList);
                                                                
extOrRepList.add(new Tuple2<Long,
ArrayList&lt;MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
                                                        }
                                                        currDTOList = new 
ArrayList<MyDTO>();
                                                }
                                                prevDto = dto;
                                                prevDTOList = currDTOList;
                                                prevGoc = dto.getGoc();
                                                prevAcct = dto.getAcct();
                                                prevSac = dto.getSumAffilCode();
                                                prevCcy = dto.getCcyCode();
                                                prevFpId = dto.getFrcstProdId();
                                        }
                                        if (prevDto != null) {
                                                prevDTOList.add(prevDto);
//                                              
extOrRepMap.put(entryCnt.incrementAndGet(), prevDTOList);
                                                extOrRepList.add(new 
Tuple2<Long,
ArrayList&lt;MyDTO>>(entryCnt.incrementAndGet(),prevDTOList));
                                        }
//                                      return extOrRepMap;
                                        return extOrRepList;
                                }

                        };

                        jdbcTemplate.setFetchSize(SQL_FETCH_SIZE);
                        StopWatch sw = new StopWatch();
                        stopWatchStart(sw, "populatingDataMap");
                        logger.debug("BEFORE populatingDataMap_STARTS!!!!");
//                      extOrRepMap = jdbcTemplate.query(sql, 
extOrRepMapResultSetExtractor);                   
                        extOrRepList = jdbcTemplate.query(sql, 
extOrRepMapResultSetExtractor);
                        logger.debug("BEFORE populatingDataMap_ENDS!!!!");
                        stopWatchEnd(sw);

                        if (extOrRepList != null) {
//                              iterator = extOrRepMap.entrySet().iterator();
                                /*
                                 * RECORDS COUNT PRINTED CORRECTLY HERE BEFORE 
PASSING TO IGNITE
                                 */
                                logger.debug("+++++++ GOC_KEY 
COUNT::{}",extOrRepList.size());
                                iterator = extOrRepList.iterator();
                        }
                        return iterator;
                } finally {
                        DATA_SRC.destroy();
                }
        }

        @Override
        protected IgniteBiTuple<Long, ArrayList&lt;MyDTO>>
parse(Tuple2<Long,ArrayList&lt;MyDTO>> rec, Object... args) {
                return new T2<>(rec.v1(), rec.v2());
        }

}




--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Missing-records-Ignite-cache-size-grows-tp10809p10871.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to