Thanks this looks interesting
On Tue, Mar 7, 2017 at 2:28 AM, Luke Shannon <[email protected]> wrote:
> I did something similar to Jake and Lyndon's suggestions. It was very fast
> and scaled well as members were added. It returned a summary object that
> the client app could use to reports on the results of the ingest (basically
> ensuring all files we ingested by some member). This needed the names of
> the files to be the key of the object for it to work.
>
> Here is the code:
>
> import java.io.BufferedReader;
> import java.io.File;
> import java.io.FileReader;
> import java.io.IOException;
> import java.util.Properties;
>
> import com.gemstone.gemfire.LogWriter;
> import com.gemstone.gemfire.cache.Cache;
> import com.gemstone.gemfire.cache.CacheFactory;
> import com.gemstone.gemfire.cache.Declarable;
> import com.gemstone.gemfire.cache.Region;
> import com.gemstone.gemfire.cache.execute.FunctionAdapter;
> import com.gemstone.gemfire.cache.execute.FunctionContext;
> import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
> import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
> import com.gemstone.gemfire.distributed.DistributedMember;
> import com.google.gson.Gson;
>
> /**
> * Reads Bulk JSON files into Gemfire in parrallel across the cluster
> * It deletes each file as its loaded
> *
> * @author lshannon
> *
> */
> public class DataLoadFunction extends FunctionAdapter implements
> Declarable {
> public static final String ID = DataLoadFunction.class.getSimpleName();
>
> private LogWriter logger;
> private DistributedMember member;
> private String backUpDirectory;
>
> private static final long serialVersionUID = -7759261808685094980L;
>
> @Override
> public void execute(FunctionContext context) {
> if (context.getArguments() == null) {
> System.out.println("Must Provide the location of the data folder when
> executing the function.");
> context.getResultSender().lastResult("Must Provide the location of the
> data folder when executing the function.");
> }
> Cache cache = CacheFactory.getAnyInstance();
> this.member = cache.getDistributedSystem().getDistributedMember();
> logger = cache.getDistributedSystem().getLogWriter();
> Object[] arg = (Object[]) context.getArguments();
> backUpDirectory = (String) arg[0];
> RegionFunctionContext rfc = (RegionFunctionContext) context;
> String loadingSummary = null;
> try {
> loadingSummary = loadSegments(rfc.getDataSet());
> context.getResultSender().lastResult(loadingSummary);
> }
> catch (Exception e) {
> context.getResultSender().lastResult(e);
> }
> }
>
> /**
> * This function passes through the folder of JSON files. If the key, which
> * is the name of the file, would be a primary on this node its loaded by
> * this member into the cluster. Otherwise its ignored add will be picked
> up by the correct member
> *
> * @return
> */
> @SuppressWarnings("unchecked")
> private String loadSegments(@SuppressWarnings("rawtypes") Region region) {
> logger.info("Started loading segments from: " + backUpDirectory);
> // summary of the loading process
> long startTime = 0, endTime = 0;
> int totalSegments = 0, loadedSegments = 0, skippedSegments = 0;
> startTime = System.currentTimeMillis();
> BufferedReader br = null;
> File segments = new File(backUpDirectory);
> logger.info("Loading From: " + backUpDirectory + " " +
> segments.list().length + " file to process");
> String[] files = segments.list();
> Gson gson = new Gson();
> for (int i = 0; i < files.length; i++) {
> if (files[i].endsWith(".json")) {
> try {
> //name of the file is the key
> String key = files[i].substring(0, files[i].indexOf("."));
> //this is an entry, but may not be one for this server
> totalSegments++;
> //this will return the member that would be the primary copy for this
> data, if its this
> //member running the function, we will do the put otherwise its skipped
> if (this.member.equals(PartitionRegionHelper.getPrimaryMemberForKey(region,
> key))) {
> //read the file
> br = new BufferedReader(new FileReader(backUpDirectory + files[i]));
> //get an array of Segment objects http://stackoverflow.com/
> questions/3763937/gson-and-deserializing-an-array-of-
> objects-with-arrays-in-it
> Segment[] segmentValue = gson.fromJson(br,Segment[].class);
> region.put(key, segmentValue);
> loadedSegments++;
> } else {
> skippedSegments++;
> }
> }
> catch (IOException e) {
> this.logger.error(e);
> }
> //clean up
> finally {
> if (br != null) {
> try {
> br.close();
> } catch (IOException e) {
> this.logger.error(e);
> }
> }
> }
> }
> }
> endTime = System.currentTimeMillis();
> //return the summary
> DataLoadFunction.LoadingSummary loadingSummary = new
> LoadingSummary(member.toString(), startTime, endTime, totalSegments,
> skippedSegments, loadedSegments);
> logger.info("Loading Complete: " + loadingSummary.toString());
> return loadingSummary.toString();
> }
>
> @Override
> public String getId() {
> return ID;
> }
>
> @Override
> public boolean hasResult() {
> return true;
> }
>
> @Override
> public boolean isHA() {
> return true;
> }
>
> @Override
> public boolean optimizeForWrite() {
> return true;
> }
>
> @Override
> public void init(Properties arg0) {
> }
>
> /**
> * Convenience class for storing the results of a segment load operation
> * @author lshannon
> *
> */
> class LoadingSummary {
> private String memberName;
> private long startTime;
> private long endTime;
> private int totalSegments;
> private int segmentsSkipped;
> private int segmentsLoaded;
>
> public LoadingSummary(String memberName, long startTime, long endTime, int
> totalSegments, int segmentsSkipped, int segmentsLoaded) {
> this.memberName = memberName;
> this.startTime = startTime;
> this.endTime = endTime;
> this.totalSegments = totalSegments;
> this.segmentsSkipped = segmentsSkipped;
> this.segmentsLoaded = segmentsLoaded;
> }
> public String getMemberName() {
> return memberName;
> }
>
> public long getStartTime() {
> return startTime;
> }
>
> public long getEndTime() {
> return endTime;
> }
>
> public int getTotalSegments() {
> return totalSegments;
> }
>
> public int getSegmentsSkipped() {
> return segmentsSkipped;
> }
>
> public int getSegmentsLoaded() {
> return segmentsLoaded;
> }
>
>
> @Override
> public String toString() {
> return "LoadingSummary [memberName=" + memberName + ", startTime="
> + startTime + ", endTime=" + endTime + ", totalSegments="
> + totalSegments + ", segmentsSkipped=" + segmentsSkipped
> + ", segmentsLoaded=" + segmentsLoaded + "]";
> }
> }
>
> }
>
> On Mon, Mar 6, 2017 at 3:42 PM, Amit Pandey <[email protected]>
> wrote:
>
>> Hey Lyndon,
>>
>> Poor dev here, cant hire you. Not in that kind of position :)
>>
>> Hey Jake,
>>
>> Makes sense. Will try your approach, with DataSerializable.
>>
>> Hi Charlie,
>>
>> Okay. I think yea, yes I understand GC needs to be tuned. Also currently
>> I do use Bulk sizes like I put 500 items and then clear the bulk data and
>> then fill up 500 again and retry. using DataSerializable with this approach
>> should be helpful I guess.
>>
>> Thanks everyone, I will be trying out things and update you guys
>>
>> On Tue, Mar 7, 2017 at 12:48 AM, Lyndon Adams <[email protected]>
>> wrote:
>>
>>> Oh my god Charlie you are taking my money making opportunities away from
>>> me. Basically he is right plus you got to add some black GC magic in to the
>>> mix to optimise pauses.
>>>
>>>
>>> On 6 Mar 2017, at 18:57, Charlie Black <[email protected]> wrote:
>>>
>>> putAll() is the bulk operation for geode. Plain and simple.
>>>
>>> The other techniques outlined in this thread are all efforts to go
>>> really fast by separating concerns at multiple levels. Or taking
>>> advantage of the fact there are other system and CPUs that are in the
>>> physical architecture.
>>>
>>> Example: The GC comment - when creating the domain objects sometimes
>>> that causes GC pressure which reduces throughput. I typically look at
>>> bulk sizes to reduce that concern.
>>>
>>> Consider all suggestions then profile your options and choose the right
>>> pattern for your app.
>>>
>>> Regards,
>>> Charlie
>>>
>>> ---
>>> Charlie Black
>>> 858.480.9722 <(858)%20480-9722> | [email protected]
>>>
>>> On Mar 6, 2017, at 10:42 AM, Amit Pandey <[email protected]>
>>> wrote:
>>>
>>> Hey Jake,
>>>
>>> Thanks. I am a bot confused so a put should be faster than putAll ?
>>>
>>> John,
>>>
>>> I need to setup all data so that they can be queried. So I don't think
>>> CacheLoader works for me. Those data are the results of a very large and
>>> expensive computations and doing them dynamically will be costly.
>>>
>>> We have a time window to setup the system because after that some other
>>> jobs will start. Currently its taking 2.4 seconds to insert 30,000 data and
>>> its great. But I am just trying to optimize if it can be made faster.
>>>
>>> Regards
>>>
>>> On Tue, Mar 7, 2017 at 12:01 AM, John Blum <[email protected]> wrote:
>>>
>>>> Amit-
>>>>
>>>> Note, a CacheLoader does not necessarily imply "loading data from a
>>>> database"; it can load data from any [external] data source and does so on
>>>> demand (i.e. lazily, on a cache miss). However, as Mike points out, this
>>>> might not work for your Use Case in situations where you are querying, for
>>>> instance.
>>>>
>>>> I guess the real question here is, what is the requirement to pre-load
>>>> this data quickly? What is the driving requirement here?
>>>>
>>>> For instance, is the need to be able to bring another system online
>>>> quickly in case of "failover". If so, perhaps an architectural change is
>>>> more appropriate such as an Active/Passive arch (using WAN).
>>>>
>>>> -j
>>>>
>>>>
>>>>
>>>> On Mon, Mar 6, 2017 at 9:45 AM, Amit Pandey <[email protected]>
>>>> wrote:
>>>>
>>>>> We might need that actually...problem is we cant use dataloader
>>>>> because we are not loading from database. So we have to use putall. Its
>>>>> taking 2 seconds for over 30000 data. If implenting it will bring it down
>>>>> that will be helpful.
>>>>> On 06-Mar-2017 10:05 pm, "Michael Stolz" <[email protected]> wrote:
>>>>>
>>>>>> Of course if you're REALLY in need of speed you can write your own
>>>>>> custom implementations of toData and fromData for the DataSerializable
>>>>>> Interface.
>>>>>>
>>>>>> I haven't seen anyone need that much speed in a long time though.
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Mike Stolz
>>>>>> Principal Engineer - Gemfire Product Manager
>>>>>> Mobile: 631-835-4771 <(631)%20835-4771>
>>>>>>
>>>>>> On Mar 3, 2017 11:16 PM, "Real Wes" <[email protected]> wrote:
>>>>>>
>>>>>>> Amit,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> John and Mike’s advice about tradeoffs is worth heeding. You’ll find
>>>>>>> that your speed is probably just fine with putAll but if you just have
>>>>>>> to
>>>>>>> have NOS in your tank, you might consider - since you’re inside a
>>>>>>> function
>>>>>>> - do the putAll from the function into your region but change the region
>>>>>>> scope to distributed-no-ack. See: https://geode.apache.org/docs/
>>>>>>> guide/developing/distributed_regions/choosing_level_of_dist.html
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Wes
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Amit Pandey [mailto:[email protected]]
>>>>>>> *Sent:* Friday, March 3, 2017 12:26 PM
>>>>>>> *To:* [email protected]
>>>>>>> *Subject:* Re: fastest way to bulk insert in geode
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hey John ,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks I am planning to use Spring XD. But my current usecase is
>>>>>>> that I am aggregating and doing some computes in a Function and then I
>>>>>>> want
>>>>>>> to populate it with the values I have a map , is region.putAll the
>>>>>>> fastest?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 3, 2017 at 10:52 PM, John Blum <[email protected]> wrote:
>>>>>>>
>>>>>>> You might consider using the Snapshot service
>>>>>>> <http://gemfire90.docs.pivotal.io/geode/managing/cache_snapshots/chapter_overview.html>
>>>>>>> [1]
>>>>>>> if you previously had data in a Region of another Cluster (for
>>>>>>> instance).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> If the data is coming externally, then *Spring XD
>>>>>>> <http://projects.spring.io/spring-xd/> *[2] is a great tool for
>>>>>>> moving (streaming) data from a source
>>>>>>> <http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/reference/html/#sources>
>>>>>>> [3]
>>>>>>> to a sink
>>>>>>> <http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/reference/html/#sinks>
>>>>>>> [4].
>>>>>>> It also allows you to perform all manners of
>>>>>>> transformations/conversions,
>>>>>>> trigger events, and so and so forth.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -j
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1] http://gemfire90.docs.pivotal.io/geode/managing/cache_sn
>>>>>>> apshots/chapter_overview.html
>>>>>>>
>>>>>>> [2] http://projects.spring.io/spring-xd/
>>>>>>>
>>>>>>> [3] http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/refer
>>>>>>> ence/html/#sources
>>>>>>>
>>>>>>> [4] http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/refer
>>>>>>> ence/html/#sinks
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 3, 2017 at 9:13 AM, Amit Pandey <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>> Hey Guys,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Whats the fastest way to do bulk insert in a region?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I am using region.putAll , is there any alternative/faster API?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> regards
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> -John
>>>>>>>
>>>>>>> john.blum10101 (skype)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> -John
>>>> john.blum10101 (skype)
>>>>
>>>
>>>
>>>
>>>
>>
>
>
> --
> Luke Shannon | Platform Engineering | Pivotal
> -------------------------------------------------------------------------
>
> Mobile:416-571-9495
> Join the Toronto Pivotal Usergroup: http://www.meetup.
> com/Toronto-Pivotal-User-Group/
>