visit
This is the piece which runs the multiple jobs asynchronously. It has a priority queue (balancer.go and pool.go) which hands off incoming requests to a set of managed workers.
// The balancer struct, this struct is used inside the GetBalancer method to provide a load balancer to the caller
type Balancer struct {
// Its the pool of Worker, which is itself a priority queue based on min heap.
pool Pool
// This channel is used to receive a request instance form the caller. After getting the request it is dispatched
// to the most lightly loaded worker
jobChannel chan *Request
// This channel is used by the worker. After processing a task, a worker uses this channel to let the balancer know
// that it is done and able to take new requests from its request channel
done chan *Worker
// Its the number of queued requests
queuedItems int
// The close channel. When the Close method is called by any calling goroutine sending a chanel of boolean, the
// balancer waits for all the requests to be processed, then closes all the worker, closes all its owen loops and
// then finally respond by sending boolean true to the passed channel by the caller, confirming that all the inner
// loop are closed and the balancer is shutdown.
closeChannel chan chan bool
}
The balancer is implemented by a min heap priority queue and when assigning a new task it checks the least loaded worker.
// The pool is a list of workers. The pool is also a priority queue.
type Pool []*Worker
func (p Pool) Len() int {
return len(p)
}
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p *Pool) Swap(i, j int) {
(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
}
func (p *Pool) Push(x interface{}) {
//n := len(*p)
item := x.(*Worker)
//item.index = n
*p = append(*p, item)
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
item := old[n-1]
//item.index = 0 // for safety
*p = old[0 : n-1]
return item
}
request := rio.BuildRequests(context,
(<callback of service 1>.WithTimeOut(100 ms).WithRetry(3))
.FollowedBy(<function for transforming data from service 1 response to request or partial request of 2>,
<callback of service 2>)
.FollowedBy(<function for transforming data data from service 2 response to request or partial request of 3>,
<callback of service 3>)
func SampleHandler(w http.ResponseWriter, r *http.Request) {
// Create the load balancer, this should be created only once.
balancer := rio.GetBalancer(10, 2) // 10 threads
// Setup the callbacks
callback1 := GetNameById("Some Name")
callback2 := GetStreetAddressByNameAndLocationId(rio.EMPTY_ARG_PLACEHOLDER, "Some Location ID")
// Set up the pipeline
request := rio.BuildRequests(context.Background(),
rio.NewFutureTask(callback1).WithMilliSecondTimeout(10).WithRetry(3), 2).
FollowedBy(Call1ToCall2, rio.NewFutureTask(callback2).WithMilliSecondTimeout(20))
// Post job
balancer.PostJob(request)
// Wait for response
<-request.CompletedChannel
// Responses
response1, err := request.GetResponse(0)
if err == nil {
// Do something with the response
fmt.Println(response1)
}
response2, err := request.GetResponse(1)
if err == nil {
// Do something with the response
fmt.Println(response2)
}
}
Once the chaining is done in line 10, we are posting the jobs like this
balancer.PostJob(request)
<-request.CompletedChannel
request.GetOnlyResponse()
request.GetResponse(index) //---0,1,2
If any job fails, the response will be empty response, specifically rio.EMPTY_CALLBACK_RESPONSE
Also published on Medium’s subdomain: