I did something similar to Jake and Lyndon's suggestions. It was very fast
and scaled well as members were added. It returned a summary object that
the client app could use to reports on the results of the ingest (basically
ensuring all files we ingested by some member). This needed the names of
the files to be the key of the object for it to work.
Here is the code:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.google.gson.Gson;
/**
* Reads Bulk JSON files into Gemfire in parrallel across the cluster
* It deletes each file as its loaded
*
* @author lshannon
*
*/
public class DataLoadFunction extends FunctionAdapter implements Declarable
{
public static final String ID = DataLoadFunction.class.getSimpleName();
private LogWriter logger;
private DistributedMember member;
private String backUpDirectory;
private static final long serialVersionUID = -7759261808685094980L;
@Override
public void execute(FunctionContext context) {
if (context.getArguments() == null) {
System.out.println("Must Provide the location of the data folder when
executing the function.");
context.getResultSender().lastResult("Must Provide the location of the data
folder when executing the function.");
}
Cache cache = CacheFactory.getAnyInstance();
this.member = cache.getDistributedSystem().getDistributedMember();
logger = cache.getDistributedSystem().getLogWriter();
Object[] arg = (Object[]) context.getArguments();
backUpDirectory = (String) arg[0];
RegionFunctionContext rfc = (RegionFunctionContext) context;
String loadingSummary = null;
try {
loadingSummary = loadSegments(rfc.getDataSet());
context.getResultSender().lastResult(loadingSummary);
}
catch (Exception e) {
context.getResultSender().lastResult(e);
}
}
/**
* This function passes through the folder of JSON files. If the key, which
* is the name of the file, would be a primary on this node its loaded by
* this member into the cluster. Otherwise its ignored add will be picked up
by the correct member
*
* @return
*/
@SuppressWarnings("unchecked")
private String loadSegments(@SuppressWarnings("rawtypes") Region region) {
logger.info("Started loading segments from: " + backUpDirectory);
// summary of the loading process
long startTime = 0, endTime = 0;
int totalSegments = 0, loadedSegments = 0, skippedSegments = 0;
startTime = System.currentTimeMillis();
BufferedReader br = null;
File segments = new File(backUpDirectory);
logger.info("Loading From: " + backUpDirectory + " " +
segments.list().length + " file to process");
String[] files = segments.list();
Gson gson = new Gson();
for (int i = 0; i < files.length; i++) {
if (files[i].endsWith(".json")) {
try {
//name of the file is the key
String key = files[i].substring(0, files[i].indexOf("."));
//this is an entry, but may not be one for this server
totalSegments++;
//this will return the member that would be the primary copy for this data,
if its this
//member running the function, we will do the put otherwise its skipped
if (this.member.equals(PartitionRegionHelper.getPrimaryMemberForKey(region,
key))) {
//read the file
br = new BufferedReader(new FileReader(backUpDirectory + files[i]));
//get an array of Segment objects
http://stackoverflow.com/questions/3763937/gson-and-deserializing-an-array-of-objects-with-arrays-in-it
Segment[] segmentValue = gson.fromJson(br,Segment[].class);
region.put(key, segmentValue);
loadedSegments++;
} else {
skippedSegments++;
}
}
catch (IOException e) {
this.logger.error(e);
}
//clean up
finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
this.logger.error(e);
}
}
}
}
}
endTime = System.currentTimeMillis();
//return the summary
DataLoadFunction.LoadingSummary loadingSummary = new
LoadingSummary(member.toString(), startTime, endTime, totalSegments,
skippedSegments, loadedSegments);
logger.info("Loading Complete: " + loadingSummary.toString());
return loadingSummary.toString();
}
@Override
public String getId() {
return ID;
}
@Override
public boolean hasResult() {
return true;
}
@Override
public boolean isHA() {
return true;
}
@Override
public boolean optimizeForWrite() {
return true;
}
@Override
public void init(Properties arg0) {
}
/**
* Convenience class for storing the results of a segment load operation
* @author lshannon
*
*/
class LoadingSummary {
private String memberName;
private long startTime;
private long endTime;
private int totalSegments;
private int segmentsSkipped;
private int segmentsLoaded;
public LoadingSummary(String memberName, long startTime, long endTime, int
totalSegments, int segmentsSkipped, int segmentsLoaded) {
this.memberName = memberName;
this.startTime = startTime;
this.endTime = endTime;
this.totalSegments = totalSegments;
this.segmentsSkipped = segmentsSkipped;
this.segmentsLoaded = segmentsLoaded;
}
public String getMemberName() {
return memberName;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public int getTotalSegments() {
return totalSegments;
}
public int getSegmentsSkipped() {
return segmentsSkipped;
}
public int getSegmentsLoaded() {
return segmentsLoaded;
}
@Override
public String toString() {
return "LoadingSummary [memberName=" + memberName + ", startTime="
+ startTime + ", endTime=" + endTime + ", totalSegments="
+ totalSegments + ", segmentsSkipped=" + segmentsSkipped
+ ", segmentsLoaded=" + segmentsLoaded + "]";
}
}
}
On Mon, Mar 6, 2017 at 3:42 PM, Amit Pandey <[email protected]>
wrote:
> Hey Lyndon,
>
> Poor dev here, cant hire you. Not in that kind of position :)
>
> Hey Jake,
>
> Makes sense. Will try your approach, with DataSerializable.
>
> Hi Charlie,
>
> Okay. I think yea, yes I understand GC needs to be tuned. Also currently I
> do use Bulk sizes like I put 500 items and then clear the bulk data and
> then fill up 500 again and retry. using DataSerializable with this approach
> should be helpful I guess.
>
> Thanks everyone, I will be trying out things and update you guys
>
> On Tue, Mar 7, 2017 at 12:48 AM, Lyndon Adams <[email protected]>
> wrote:
>
>> Oh my god Charlie you are taking my money making opportunities away from
>> me. Basically he is right plus you got to add some black GC magic in to the
>> mix to optimise pauses.
>>
>>
>> On 6 Mar 2017, at 18:57, Charlie Black <[email protected]> wrote:
>>
>> putAll() is the bulk operation for geode. Plain and simple.
>>
>> The other techniques outlined in this thread are all efforts to go really
>> fast by separating concerns at multiple levels. Or taking advantage of
>> the fact there are other system and CPUs that are in the physical
>> architecture.
>>
>> Example: The GC comment - when creating the domain objects sometimes that
>> causes GC pressure which reduces throughput. I typically look at bulk
>> sizes to reduce that concern.
>>
>> Consider all suggestions then profile your options and choose the right
>> pattern for your app.
>>
>> Regards,
>> Charlie
>>
>> ---
>> Charlie Black
>> 858.480.9722 <(858)%20480-9722> | [email protected]
>>
>> On Mar 6, 2017, at 10:42 AM, Amit Pandey <[email protected]>
>> wrote:
>>
>> Hey Jake,
>>
>> Thanks. I am a bot confused so a put should be faster than putAll ?
>>
>> John,
>>
>> I need to setup all data so that they can be queried. So I don't think
>> CacheLoader works for me. Those data are the results of a very large and
>> expensive computations and doing them dynamically will be costly.
>>
>> We have a time window to setup the system because after that some other
>> jobs will start. Currently its taking 2.4 seconds to insert 30,000 data and
>> its great. But I am just trying to optimize if it can be made faster.
>>
>> Regards
>>
>> On Tue, Mar 7, 2017 at 12:01 AM, John Blum <[email protected]> wrote:
>>
>>> Amit-
>>>
>>> Note, a CacheLoader does not necessarily imply "loading data from a
>>> database"; it can load data from any [external] data source and does so on
>>> demand (i.e. lazily, on a cache miss). However, as Mike points out, this
>>> might not work for your Use Case in situations where you are querying, for
>>> instance.
>>>
>>> I guess the real question here is, what is the requirement to pre-load
>>> this data quickly? What is the driving requirement here?
>>>
>>> For instance, is the need to be able to bring another system online
>>> quickly in case of "failover". If so, perhaps an architectural change is
>>> more appropriate such as an Active/Passive arch (using WAN).
>>>
>>> -j
>>>
>>>
>>>
>>> On Mon, Mar 6, 2017 at 9:45 AM, Amit Pandey <[email protected]>
>>> wrote:
>>>
>>>> We might need that actually...problem is we cant use dataloader because
>>>> we are not loading from database. So we have to use putall. Its taking 2
>>>> seconds for over 30000 data. If implenting it will bring it down that will
>>>> be helpful.
>>>> On 06-Mar-2017 10:05 pm, "Michael Stolz" <[email protected]> wrote:
>>>>
>>>>> Of course if you're REALLY in need of speed you can write your own
>>>>> custom implementations of toData and fromData for the DataSerializable
>>>>> Interface.
>>>>>
>>>>> I haven't seen anyone need that much speed in a long time though.
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Mike Stolz
>>>>> Principal Engineer - Gemfire Product Manager
>>>>> Mobile: 631-835-4771 <(631)%20835-4771>
>>>>>
>>>>> On Mar 3, 2017 11:16 PM, "Real Wes" <[email protected]> wrote:
>>>>>
>>>>>> Amit,
>>>>>>
>>>>>>
>>>>>>
>>>>>> John and Mike’s advice about tradeoffs is worth heeding. You’ll find
>>>>>> that your speed is probably just fine with putAll but if you just have to
>>>>>> have NOS in your tank, you might consider - since you’re inside a
>>>>>> function
>>>>>> - do the putAll from the function into your region but change the region
>>>>>> scope to distributed-no-ack. See: https://geode.apache.org/docs/
>>>>>> guide/developing/distributed_regions/choosing_level_of_dist.html
>>>>>>
>>>>>>
>>>>>>
>>>>>> Wes
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Amit Pandey [mailto:[email protected]]
>>>>>> *Sent:* Friday, March 3, 2017 12:26 PM
>>>>>> *To:* [email protected]
>>>>>> *Subject:* Re: fastest way to bulk insert in geode
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hey John ,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks I am planning to use Spring XD. But my current usecase is that
>>>>>> I am aggregating and doing some computes in a Function and then I want to
>>>>>> populate it with the values I have a map , is region.putAll the fastest?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 3, 2017 at 10:52 PM, John Blum <[email protected]> wrote:
>>>>>>
>>>>>> You might consider using the Snapshot service
>>>>>> <http://gemfire90.docs.pivotal.io/geode/managing/cache_snapshots/chapter_overview.html>
>>>>>> [1]
>>>>>> if you previously had data in a Region of another Cluster (for instance).
>>>>>>
>>>>>>
>>>>>>
>>>>>> If the data is coming externally, then *Spring XD
>>>>>> <http://projects.spring.io/spring-xd/> *[2] is a great tool for
>>>>>> moving (streaming) data from a source
>>>>>> <http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/reference/html/#sources>
>>>>>> [3]
>>>>>> to a sink
>>>>>> <http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/reference/html/#sinks>
>>>>>> [4].
>>>>>> It also allows you to perform all manners of transformations/conversions,
>>>>>> trigger events, and so and so forth.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -j
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] http://gemfire90.docs.pivotal.io/geode/managing/cache_sn
>>>>>> apshots/chapter_overview.html
>>>>>>
>>>>>> [2] http://projects.spring.io/spring-xd/
>>>>>>
>>>>>> [3] http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/refer
>>>>>> ence/html/#sources
>>>>>>
>>>>>> [4] http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/refer
>>>>>> ence/html/#sinks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 3, 2017 at 9:13 AM, Amit Pandey <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>> Hey Guys,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Whats the fastest way to do bulk insert in a region?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I am using region.putAll , is there any alternative/faster API?
>>>>>>
>>>>>>
>>>>>>
>>>>>> regards
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> -John
>>>>>>
>>>>>> john.blum10101 (skype)
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>>>
>>> --
>>> -John
>>> john.blum10101 (skype)
>>>
>>
>>
>>
>>
>
--
Luke Shannon | Platform Engineering | Pivotal
-------------------------------------------------------------------------
Mobile:416-571-9495
Join the Toronto Pivotal Usergroup:
http://www.meetup.com/Toronto-Pivotal-User-Group/