paint-brush
An Intro to Rio: A Lightweight Job Scheduler in Go by@susamn
1,266 reads
1,266 reads

An Intro to Rio: A Lightweight Job Scheduler in Go

by Supratim SamantaJuly 6th, 2021
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Rioio tries to solve this problem by introducing two concepts: an asynchronous job processor and a priority queue. It is the piece which runs the multiple jobs asynchronously. It also provides easy semantics to join multiple data sources based on their output and input types, at the same time having no coupling between the data sources. This helps in creating new APIs or resolvers for GraphQL APIs a breeze. The idea is that multiple calls by chaining calls together means using closures and function types and runs one goroutine. No dependency between these calls is dependent on data dependency.

Coin Mentioned

Mention Thumbnail
featured image - An Intro to Rio: A Lightweight Job Scheduler in Go
Supratim Samanta HackerNoon profile picture

What is RIO?

It also provides very easy semantics to join multiple data sources based on their output and input types, at the same time having no coupling between the data sources. This helps in creating new APIs or resolvers for GraphQL APIs a breeze.


Concern

Many times we write web apps which connect to different data sources, combine the data obtained from these sources and then do some more jobs. During these processes, we do a lot of boilerplate to transform one data type to other. Also in the absence of a proper job scheduler, we create goroutines abruptly and without proper management. These create unmanageable code. To update those codes is even more hard in future, when there is a new team member in the team.


Rio tries to solve this problem by introducing two concepts.


An asynchronous job processor

This is the piece which runs the multiple jobs asynchronously. It has a priority queue (balancer.go and pool.go) which hands off incoming requests to a set of managed workers.

// The balancer struct, this struct is used inside the GetBalancer method to provide a load balancer to the caller
type Balancer struct {

	// Its the pool of Worker, which is itself a priority queue based on min heap.
	pool Pool

	// This channel is used to receive a request instance form the caller. After getting the request it is dispatched
	// to the most lightly loaded worker
	jobChannel chan *Request

	// This channel is used by the worker. After processing a task, a worker uses this channel to let the balancer know
	// that it is done and able to take new requests from its request channel
	done chan *Worker

	// Its the number of queued requests
	queuedItems int

	// The close channel. When the Close method is called by any calling goroutine sending a chanel of boolean, the
	// balancer waits for all the requests to be processed, then closes all the worker, closes all its owen loops and
	// then finally respond by sending boolean true to the passed channel by the caller, confirming that all the inner
	// loop are closed and the balancer is shutdown.
	closeChannel chan chan bool
}


The balancer is implemented by a min heap priority queue and when assigning a new task it checks the least loaded worker.


To implement the min heap, we just need to implement 4 handy methods of the pool interface like this:


// The pool is a list of workers. The pool is also a priority queue.
type Pool []*Worker

func (p Pool) Len() int {
	return len(p)
}

func (p Pool) Less(i, j int) bool {
	return p[i].pending < p[j].pending
}

func (p *Pool) Swap(i, j int) {
	(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
}

func (p *Pool) Push(x interface{}) {
	//n := len(*p)
	item := x.(*Worker)
	//item.index = n
	*p = append(*p, item)
}

func (p *Pool) Pop() interface{} {
	old := *p
	n := len(old)
	item := old[n-1]
	//item.index = 0 // for safety
	*p = old[0 : n-1]
	return item
}

Easy management of these goroutines and chaining them

How many times do we do this:


call service 1 in goroutine 1 wait and get response from goroutine 1


call service 2 in goroutine 2, taking piece of data from service call 1 wait and get response from goroutine 2


call service 3 in goroutine 3, taking piece of data from service call 3 wait and get response from goroutine 3


You get the idea, this only delays things more and does a lot of context switching. Rio helps in this, by chaining multiple calls together by means of using closures and function types and runs in one goroutine.


Now many can think is it not going to be slower compared to doing multiple goroutine calls. Let’s see.


Think of the previous example. If you do not get a response from service 1, can you invoke service 2, or if service 2 fails, can you call service 3? No, as there is data dependency between these calls.


Rio chains dependent jobs together by introducing this pattern.


request := rio.BuildRequests(context,
          (<callback of service 1>.WithTimeOut(100 ms).WithRetry(3))
          .FollowedBy(<function for transforming data from service 1 response to request or partial request of 2>,
                      <callback of service 2>)
          .FollowedBy(<function for transforming data data from service 2 response to request or partial request of 3>,
                                  <callback of service 3>)


Let’s see an example


func SampleHandler(w http.ResponseWriter, r *http.Request) {
	// Create the load balancer, this should be created only once.
	balancer := rio.GetBalancer(10, 2) // 10 threads

	// Setup the callbacks
	callback1 := GetNameById("Some Name")
	callback2 := GetStreetAddressByNameAndLocationId(rio.EMPTY_ARG_PLACEHOLDER, "Some Location ID")

	// Set up the pipeline
	request := rio.BuildRequests(context.Background(),
		rio.NewFutureTask(callback1).WithMilliSecondTimeout(10).WithRetry(3), 2).
		FollowedBy(Call1ToCall2, rio.NewFutureTask(callback2).WithMilliSecondTimeout(20))

	// Post job
	balancer.PostJob(request)

	// Wait for response
	<-request.CompletedChannel

	// Responses
	response1, err := request.GetResponse(0)
	if err == nil {
		// Do something with the response
		fmt.Println(response1)
	}
	response2, err := request.GetResponse(1)
	if err == nil {
		// Do something with the response
		fmt.Println(response2)
	}

}


Once the chaining is done in line 10, we are posting the jobs like this


balancer.PostJob(request)


And finally waiting for the chain to complete


<-request.CompletedChannel


Once the call chain happens, the request comes back with responses for all these calls in a slice and we can do this:


  • Only one job response
    request.GetOnlyResponse()
    
  • Multiple job responses
    request.GetResponse(index) //---0,1,2
    


If any job fails, the response will be empty response, specifically rio.EMPTY_CALLBACK_RESPONSE


Watch out for the full example in the example folder if anyone wants to use it.


Thanks !!!


Also published on Medium’s subdomain:

바카라사이트 바카라사이트 온라인바카라