Re: [go-nuts] Re: Counting semaphore vs Workerpool

2018-05-28 Thread Jesper Louis Andersen
There is also a hybrid model:

Model A: for each work item, create a goroutine. This is easy to program,
but uses additional resources for the goroutines.
Model B: keep a worker pool and feed a channel. This bounds the goroutine
count and requires more code to pull off and make correct.

I'm going to propose Model C. Keep work in an array. Spawn 500 goroutines
and have them try to take a message on a chan which acts like a counting
semaphore. They each mark work in the array as being processed. Every once
in a while, spawn a new set of 500 goroutines and let them have the new
work. If they manage to get a message from the semaphore, they operate,
otherwise they terminate. This bounds the goroutine count, but it avoids
having to explicitly manage a worker pool.

If your work is stored persistently on disk, Model C can be pretty alluring
since you can have something like

CREATE INDEX todo_idx ON Work(processed) WHERE processed = false;
CREATE INDEX progress_idx ON Work(in_progress) WHERE in_progress = false;

in an SQL database and then you do:

SELECT W.id FROM Work W WHERE processed = false AND in_progress = false
LIMIT 500;

Due to the partial indexes, this is fast. And if each worker marks
something as in_progress, you avoid additional work.

Basically, you have a bucket of work and make sure it is full of water once
in a while. Also, if you drip-feed into the counting semaphore you've just
built a system which is also able to load regulate.

It isn't like one model is particularly better than the other. It all
depends on what kind of problem you are trying to solve and what kind of
failure semantics you want your system to have. In particular, I've found
that you need to focus on the failure semantics first in a system[0]

[0] This is also the major reason why errors should be treated as values
over control operators in most cases.

On Sat, May 26, 2018 at 6:52 PM  wrote:

> So, it looks to me like 1 million URLS will spin up 1 million goroutines.
> That seems unnecessary, and likely to be a problem. I usually default to
> the simplest model, unless there is a reason to do otherwise. IMHO, here
> that would be to spin up as many goroutines as you want to be working at
> once (500), then feed data through a channel. Untested pseudo-code:
> func fetchAll(ctx context.Context) {
> workerCount := min(500, len(urls))
> c := make(chan string)
>
> // Spin up workers
> for i := 0; i < workerCount; i++ {
> go doWork(ctx, c)
> }
> // Feed them work
>
> for _, u := range urls {
>
> if u == "" {
> continue
>
> }
> _, err := url.Parse(u)
> if err != nil {
> log.Printf("%s returned an error- %v", u, err)
> continue
> }
>
> c <- u
> }
> // All work sent, signal workers to die. Note, workers may still be
> // processing at this point.
> close(c)
> }
>
> func doWork(ctx context.Context, c chan string) {
> for {
> u := <-c
> if u == "" {
> // channel was closed ... no more work.
> return
> }
> fetch(ctx, u)
> }
> }
>
> func fetch(ctx context.Context, u string) {
>
> req, err := http.NewRequest(http.MethodGet, u, nil)
> if err != nil {
> log.Printf("%s returned an error while creating a request- %v", u,
> err)
> return
> }
> req = req.WithContext(ctx)
> res, err := http.DefaultClient.Do(req)
> if err != nil {
> log.Printf("%s returned an error while performing a request  - %v"
> , u, err)
> return
> }
> //Close response body as soon as function returns to prevent resource
>
> //leakage. https://golang.org/pkg/net/http/#Response
> defer res.Body.Close()
> }
>
> I find this basic model simple and easy top reason about. And it should
> avoid wasted goroutines. I have not really paid much attention to "fetch".
> I assume it was already correct code. I'm not sure if you are expecting to
> handle a cancel on the Context. If so, then you will want to add a check
> for that in your main fetchAll() loop, and bail early if it was canceled.
>
>
> On Saturday, May 26, 2018 at 11:48:28 AM UTC-4, Karthik Rao wrote:
>>
>> I am writing an application that has to fetch data from - say a million
>> URLs. I currently have an implementation which looks like the code below
>>
>> //Make sure that we just have 500 or less goroutines fetching from URLs
>> sem := make(chan struct{}, min(500, len(urls)))
>> //Check if all URLs in the request are valid and if so spawn a goroutine
>> to fetch data.
>> for _, u := range urls {
>> _, err := url.Parse(u)
>> if err != nil {
>> log.Printf("%s returned an error- %v", u, err)
>> continue
>> }
>> go fetch(ctx, sem, u)
>> }
>>
>> func fetch(ctx context.Context, sem chan struct{}, u string) {
>> sem <- struct{}{}
>> defer func() { <-sem }()
>> req, err := http.NewRequest(http.MethodGet, u, nil)
>> if err != nil {
>> 

[go-nuts] Re: Counting semaphore vs Workerpool

2018-05-26 Thread jake6502
So, it looks to me like 1 million URLS will spin up 1 million goroutines. 
That seems unnecessary, and likely to be a problem. I usually default to 
the simplest model, unless there is a reason to do otherwise. IMHO, here 
that would be to spin up as many goroutines as you want to be working at 
once (500), then feed data through a channel. Untested pseudo-code:
func fetchAll(ctx context.Context) {
workerCount := min(500, len(urls))
c := make(chan string)

// Spin up workers
for i := 0; i < workerCount; i++ {
go doWork(ctx, c)
}
// Feed them work
for _, u := range urls {
if u == "" {
continue
}
_, err := url.Parse(u)
if err != nil {
log.Printf("%s returned an error- %v", u, err)
continue
}
c <- u
}
// All work sent, signal workers to die. Note, workers may still be
// processing at this point.
close(c)
}

func doWork(ctx context.Context, c chan string) {
for {
u := <-c
if u == "" {
// channel was closed ... no more work.
return
}
fetch(ctx, u)
}
}

func fetch(ctx context.Context, u string) {
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
log.Printf("%s returned an error while creating a request- %v", u, 
err)
return
}
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("%s returned an error while performing a request  - %v", 
u, err)
return
}
//Close response body as soon as function returns to prevent resource
//leakage. https://golang.org/pkg/net/http/#Response
defer res.Body.Close()
}

I find this basic model simple and easy top reason about. And it should 
avoid wasted goroutines. I have not really paid much attention to "fetch". 
I assume it was already correct code. I'm not sure if you are expecting to 
handle a cancel on the Context. If so, then you will want to add a check 
for that in your main fetchAll() loop, and bail early if it was canceled. 


On Saturday, May 26, 2018 at 11:48:28 AM UTC-4, Karthik Rao wrote:
>
> I am writing an application that has to fetch data from - say a million 
> URLs. I currently have an implementation which looks like the code below 
>
> //Make sure that we just have 500 or less goroutines fetching from URLs
> sem := make(chan struct{}, min(500, len(urls)))
> //Check if all URLs in the request are valid and if so spawn a goroutine 
> to fetch data.
> for _, u := range urls {
> _, err := url.Parse(u)
> if err != nil {
> log.Printf("%s returned an error- %v", u, err)
> continue
> }
> go fetch(ctx, sem, u)
> }
>
> func fetch(ctx context.Context, sem chan struct{}, u string) {
> sem <- struct{}{}
> defer func() { <-sem }()
> req, err := http.NewRequest(http.MethodGet, u, nil)
> if err != nil {
> log.Printf("%s returned an error while creating a request- %v", u, err)
> return
> }
> req = req.WithContext(ctx)
> res, err := http.DefaultClient.Do(req)
> if err != nil {
> log.Printf("%s returned an error while performing a request  - %v", u, err)
> return
> }
> //Close response body as soon as function returns to prevent resource 
> lekage.
> //https://golang.org/pkg/net/http/#Response
> defer res.Body.Close()
> }
>
> Would this application choke when a million goroutines are spawned and are 
> waiting for a place on the sem channel?  I have profiled my code using 
> pprof and see no problems when I tested it with 50k URLs.
>
> What is the cost of a goroutine waiting on the semaphore channel? Would it 
> be ~2KB?
>
> Is using a worker pool like the one mentioned here 
> 
>  better? 
> What would be the advantages? I am of the opinion that the runtime 
> scheduler is a better judge when it comes to managing goroutines.
>
> Another question is - Would it be better that acquire the semaphore within 
> the loop such that I limit the number of goroutines spawned? Mr Dave Cheney 
> suggested otherwise in his talk here 
> .
>
> Any other suggestions are also welcome.
>
> TIA!
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.