Re: Data streamer has been cancelled

2020-05-18 Thread nithin91
Got it. Thanks a lot. This is very useful



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Data streamer has been cancelled

2020-05-18 Thread Manuel Núñez Sánchez
Since there are several approaches on solving this, I’m continue with your 
code, a couple of suggestions:

- Create streamer instance out of loop, and close it on finally, not within the 
loop.
- stmr.autoflushfrequency(0) as you do it every 2000 elements… 
- Don’t forget remaining data (< 2000) from last iteration

IgniteDataStreamer stmr = 
ignite.dataStreamer("PieCountryAllocationCache");
stmr.allowOverwrite(true);
// disable auto flush - we’ll do it manually
stmr.autoflushfrequency(0);

try{
int j=0;
for (Map.Entry entry :
PieCountryAllocationobjs.entrySet()) { 
 
tempobjs.put(entry.getKey(), 
entry.getValue());
//For ever 2000 rows i am callling stmr.addData(tempobjs) and then stmr.flush 
and stmr.close(false)
if(j++ == 2000 ){
System.out.println(j);
stmr.addData(tempobjs);
// do flush every 2000 items
stmr.flush();
tempobjs.clear();
System.out.println(“Sent 
Ended");
System.out.println(j);
j = 0;
}
 }

  // stream remaining data
  if (!tempobjs.is Empty()){
  stmr.addData(tempobjs);
  }

} finally {
stmr.flush();
stmr.close(false);
}

> El 18 may 2020, a las 12:36, nithin91 
>  escribió:
> 
>   int j=0;
>   for (Map.Entry entry :
> PieCountryAllocationobjs.entrySet()) { 
>
>   tempobjs.put(entry.getKey(), 
> entry.getValue());
> //For ever 2000 rows i am callling stmr.addData(tempobjs) and then
> stmr.flush and stmr.close(false)
>   if((j%2000==0 && j!=0) ||
>   
> (PieCountryAllocationobjs.keySet().size() < 2000 &&
> j==PieCountryAllocationobjs.keySet().size())
>   || 
> j==PieCountryAllocationobjs.keySet().size()
>   ){
>   System.out.println(j);
>   IgniteDataStreamer PieCountryAllocation> stmr =
> ignite.dataStreamer("PieCountryAllocationCache");
>   stmr.allowOverwrite(true);
>   stmr.addData(tempobjs);
>   stmr.flush();
>   stmr.close(false);
>   tempobjs.clear();
>   System.out.println("Stream 
> Ended");
>   System.out.println(j);
>   
>   }
>   j++;
>}



Re: Data streamer has been cancelled

2020-05-18 Thread nithin91
Hi 

Implemented the code as suggested by you. Please find the code related to
this. Please let me know is this 
right way of implementing what you suggested.

Also can you please let me know the use of stmr.autoflushfrequency(2000)
method usage .If i pass higher number to this method,will that improve the
performance.

Map Originalobjs=new HashMap();--Contains all the
0.1 million key value pairs that has to be loaded

Map tempobjs=new HashMap();--Temp object that
will contain only 2000 
records at a time and which will be pushed to cache using data streamer

int j=0;
for (Map.Entry entry :
PieCountryAllocationobjs.entrySet()) { 
 
tempobjs.put(entry.getKey(), 
entry.getValue());
//For ever 2000 rows i am callling stmr.addData(tempobjs) and then
stmr.flush and stmr.close(false)
if((j%2000==0 && j!=0) ||

(PieCountryAllocationobjs.keySet().size() < 2000 &&
j==PieCountryAllocationobjs.keySet().size())
|| 
j==PieCountryAllocationobjs.keySet().size()
){
System.out.println(j);
IgniteDataStreamer stmr =
ignite.dataStreamer("PieCountryAllocationCache");
stmr.allowOverwrite(true);
stmr.addData(tempobjs);
stmr.flush();
stmr.close(false);
tempobjs.clear();
System.out.println("Stream 
Ended");
System.out.println(j);

}
j++;
 }



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Data streamer has been cancelled

2020-05-17 Thread Manuel Núñez
To improve performance use addData in bach mode (map) for example 
every 2000 entries, use finally with flush and close(false) on streamer to 
ensure data have been properly loaded.

Cheers!

Manuel.

> El 17 may 2020, a las 12:08, nithin91 
>  escribió:
> 
> Hi 
> 
> Currently i am trying to load the data into ignite cache using data steamer
> from Oracle DB.
> 
> Currently i have two server nodes deployed on two Linux servers and i am
> executing this as a standalone java program from my local machine.
> 
> To achieve this i have followed the below steps.
> 
> 1. Start in client Mode by setting client node=true in the bean file.
> 2.  Fetch the data from Oracle DB using JDBC resultset and set the fetchsize
> =10 for the prepared statement and load this data into a temporary Map
> Object.
> 3. Iterate through the map object and load data into cache using
> stmr.adddata API corresponding data steamer.
> 
> Out of 1 lakh rows, only 35K rows are getting loaded and the client node is
> stopped all of a sudden.Can anyone please help me in resolving this issue.
> 
> 
> Following is the log generated by this program.Attached you the Program code
> that i am executing 
> and Client.xml file for your reference.
> Client.xml
> <http://apache-ignite-users.70518.x6.nabble.com/file/t2737/Client.xml>  
> Java_Program.txt
> <http://apache-ignite-users.70518.x6.nabble.com/file/t2737/Java_Program.txt>  
> 
> what my understanding after looking at the log is , stmr.adddata retruns
> IgniteFuture which means it is an async operation and since the program gets
> ended after completion of iteration with some data yet to load in the cache.
> 
> 
> May 17, 2020 1:35:29 PM org.apache.ignite.logger.java.JavaLogger error
> SEVERE: DataStreamer operation failed.
> class org.apache.ignite.IgniteCheckedException: Data streamer has been
> cancelled: DataStreamerImpl [bufLdrSzPerThread=4096,
> rcvr=org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater@3b0ee03a,
> ioPlcRslvr=null, cacheName=PieCountryAllocationCache, bufSize=512,
> parallelOps=0, timeout=-1, autoFlushFreq=0,
> bufMappings={be10cd31-aed7-448a-8fec-60fd72a62313=Buffer
> [node=TcpDiscoveryNode [id=be10cd31-aed7-448a-8fec-60fd72a62313,
> addrs=[127.0.0.1, 172.30.197.5], sockAddrs=[/127.0.0.1:47500,
> azuswvlnx00687.corp.frk.com/172.30.197.5:47500], discPort=47500, order=1,
> intOrder=1, lastExchangeTime=1589702634687, loc=false,
> ver=2.7.6#20190911-sha1:21f7ca41, isClient=false], isLocNode=false,
> idGen=85, sem=java.util.concurrent.Semaphore@5b12012e[Permits = 15],
> perNodeParallelOps=64, entriesCnt=2944, locFutsSize=0, reqsSize=49],
> 92cbed29-93d6-428d-a3da-a30e4264aa20=Buffer [node=TcpDiscoveryNode
> [id=92cbed29-93d6-428d-a3da-a30e4264aa20, addrs=[127.0.0.1, 172.30.197.6],
> sockAddrs=[azuswvlnx00688.corp.frk.com/172.30.197.6:47500,
> /127.0.0.1:47500], discPort=47500, order=2, intOrder=2,
> lastExchangeTime=1589702635145, loc=false, ver=2.7.6#20190911-sha1:21f7ca41,
> isClient=false], isLocNode=false, idGen=99,
> sem=java.util.concurrent.Semaphore@2f7dcef2[Permits = 0],
> perNodeParallelOps=64, entriesCnt=1152, locFutsSize=0, reqsSize=64]},
> cacheObjProc=GridProcessorAdapter [],
> cacheObjCtx=org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryContext@4a3be6a5,
> cancelled=true, cancellationReason=null, failCntr=0,
> activeFuts=GridConcurrentHashSet [elements=[GridFutureAdapter
> [ignoreInterrupts=false, state=INIT, res=null, hash=1579584742],
> GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
> hash=2059282367], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
> res=null, hash=1027006452], GridFutureAdapter [ignoreInterrupts=false,
> state=INIT, res=null, hash=950125603], GridFutureAdapter
> [ignoreInterrupts=false, state=INIT, res=null, hash=227100877],
> GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
> hash=741370455], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
> res=null, hash=1536478396], GridFutureAdapter [ignoreInterrupts=false,
> state=INIT, res=null, hash=1081344572], GridFutureAdapter
> [ignoreInterrupts=false, state=INIT, res=null, hash=1538745405],
> GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
> hash=2000563893], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
> res=null, hash=997918120], GridFutureAdapter [ignoreInterrupts=false,
> state=INIT, res=null, hash=985679444], GridFutureAdapter
> [ignoreInterrupts=false, state=INIT, res=null, hash=1164436797],
> GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
> hash=954937264], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
> res=null, hash=339126187], GridF

Data streamer has been cancelled

2020-05-17 Thread nithin91
Hi 

Currently i am trying to load the data into ignite cache using data steamer
from Oracle DB.

Currently i have two server nodes deployed on two Linux servers and i am
executing this as a standalone java program from my local machine.

To achieve this i have followed the below steps.

1. Start in client Mode by setting client node=true in the bean file.
2.  Fetch the data from Oracle DB using JDBC resultset and set the fetchsize
=10 for the prepared statement and load this data into a temporary Map
Object.
3. Iterate through the map object and load data into cache using
stmr.adddata API corresponding data steamer.

 Out of 1 lakh rows, only 35K rows are getting loaded and the client node is
stopped all of a sudden.Can anyone please help me in resolving this issue.


Following is the log generated by this program.Attached you the Program code
that i am executing 
and Client.xml file for your reference.
Client.xml
<http://apache-ignite-users.70518.x6.nabble.com/file/t2737/Client.xml>  
Java_Program.txt
<http://apache-ignite-users.70518.x6.nabble.com/file/t2737/Java_Program.txt>  

what my understanding after looking at the log is , stmr.adddata retruns
IgniteFuture which means it is an async operation and since the program gets
ended after completion of iteration with some data yet to load in the cache.


May 17, 2020 1:35:29 PM org.apache.ignite.logger.java.JavaLogger error
SEVERE: DataStreamer operation failed.
class org.apache.ignite.IgniteCheckedException: Data streamer has been
cancelled: DataStreamerImpl [bufLdrSzPerThread=4096,
rcvr=org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater@3b0ee03a,
ioPlcRslvr=null, cacheName=PieCountryAllocationCache, bufSize=512,
parallelOps=0, timeout=-1, autoFlushFreq=0,
bufMappings={be10cd31-aed7-448a-8fec-60fd72a62313=Buffer
[node=TcpDiscoveryNode [id=be10cd31-aed7-448a-8fec-60fd72a62313,
addrs=[127.0.0.1, 172.30.197.5], sockAddrs=[/127.0.0.1:47500,
azuswvlnx00687.corp.frk.com/172.30.197.5:47500], discPort=47500, order=1,
intOrder=1, lastExchangeTime=1589702634687, loc=false,
ver=2.7.6#20190911-sha1:21f7ca41, isClient=false], isLocNode=false,
idGen=85, sem=java.util.concurrent.Semaphore@5b12012e[Permits = 15],
perNodeParallelOps=64, entriesCnt=2944, locFutsSize=0, reqsSize=49],
92cbed29-93d6-428d-a3da-a30e4264aa20=Buffer [node=TcpDiscoveryNode
[id=92cbed29-93d6-428d-a3da-a30e4264aa20, addrs=[127.0.0.1, 172.30.197.6],
sockAddrs=[azuswvlnx00688.corp.frk.com/172.30.197.6:47500,
/127.0.0.1:47500], discPort=47500, order=2, intOrder=2,
lastExchangeTime=1589702635145, loc=false, ver=2.7.6#20190911-sha1:21f7ca41,
isClient=false], isLocNode=false, idGen=99,
sem=java.util.concurrent.Semaphore@2f7dcef2[Permits = 0],
perNodeParallelOps=64, entriesCnt=1152, locFutsSize=0, reqsSize=64]},
cacheObjProc=GridProcessorAdapter [],
cacheObjCtx=org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryContext@4a3be6a5,
cancelled=true, cancellationReason=null, failCntr=0,
activeFuts=GridConcurrentHashSet [elements=[GridFutureAdapter
[ignoreInterrupts=false, state=INIT, res=null, hash=1579584742],
GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
hash=2059282367], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
res=null, hash=1027006452], GridFutureAdapter [ignoreInterrupts=false,
state=INIT, res=null, hash=950125603], GridFutureAdapter
[ignoreInterrupts=false, state=INIT, res=null, hash=227100877],
GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
hash=741370455], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
res=null, hash=1536478396], GridFutureAdapter [ignoreInterrupts=false,
state=INIT, res=null, hash=1081344572], GridFutureAdapter
[ignoreInterrupts=false, state=INIT, res=null, hash=1538745405],
GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
hash=2000563893], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
res=null, hash=997918120], GridFutureAdapter [ignoreInterrupts=false,
state=INIT, res=null, hash=985679444], GridFutureAdapter
[ignoreInterrupts=false, state=INIT, res=null, hash=1164436797],
GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
hash=954937264], GridFutureAdapter [ignoreInterrupts=false, state=INIT,
res=null, hash=339126187], GridFutureAdapter [ignoreInterrupts=false,
state=INIT, res=null, hash=1053856141], GridFutureAdapter
[ignoreInterrupts=false, state=INIT, res=null, hash=862152124],
GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null,
hash=1934729582]]],
jobPda=org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$DataStreamerPda@3721177d,
depCls=null, fut=DataStreamerFuture [super=GridFutureAdapter
[ignoreInterrupts=false, state=INIT, res=null, hash=1986676021]],
publicFut=IgniteFuture [orig=DataStreamerFuture [super=GridFutureAdapter
[ignoreInterrupts=false, state=INIT, res=null, hash=1986676021]]],
disconnectErr=null, closed=true, lastFlushTime=1589702649336,
skipStore=false, keepBinary=false,