visit
Project Link:
In modern programming languages concurrency has become an unequivocal need. Almost all programming languages today have some method of concurrency. Some have really powerful constructs, that can offload works to different threads in the OS, e.g., Java, some mimic this behavior in the same thread, e.g., Ruby.Golang has a very powerful concurrency model called CSP (communicating sequential processes), which breaks a problem into smaller sequential processes and then schedules several instances of these processes called Goroutines. The communication between these processes happens by passing immutable messages via Channels.Here we will explore, how we can take advantage of concurrency in golang and how we can limit its usage with worker pools. In the second part, we will explore how to build a robust solution for concurrency around this concept.Let's imagine we have an external API call which takes about 100ms to get done. If we have 1000 of these calls, and we call these synchronously, it would take about 100s to complete
//// model/data.go
package model
type SimpleData struct {
ID int
}
//// basic/basic.go
package basic
import (
"fmt"
"github.com/Joker666/goworkerpool/model"
"time"
)
func Work(allData []model.SimpleData) {
start := time.Now()
for i, _ := range allData {
Process(allData[i])
}
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
func Process(data model.SimpleData) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Finish processing %d\n", data.ID)
}
//// main.go
package main
import (
"fmt"
"github.com/Joker666/goworkerpool/basic"
"github.com/Joker666/goworkerpool/model"
"github.com/Joker666/goworkerpool/worker"
)
func main() {
// Prepare the data
var allData []model.SimpleData
for i := 0; i < 1000; i++ {
data := model.SimpleData{ ID: i }
allData = append(allData, data)
}
fmt.Printf("Start processing all work \n")
// Process
basic.Work(allData)
}
Start processing all work
Took ===============> 1m40.226679665s
//// worker/notPooled.go
func NotPooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
dataCh := make(chan model.SimpleData, 100)
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
wg.Add(1)
go func(data model.SimpleData) {
defer wg.Done()
basic.Process(data)
}(data)
}
}()
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.NotPooledWork(allData)
Start processing all work
Took ===============> 101.191534ms
Here, we create a buffered channel of 100 and add all the data passed to
NoPooledWork
to the channel. Since it is a buffered channel, it would not be able to enter any more data than 100 until data is extracted from it. Which we do it inside a goroutine. We range over the channel, extract data from it, add a goroutine, and process. There's no limit to how many goroutines we are creating here. This can process as many tasks as passed. Theoretically, it is possible to handle as much data as possible given the resources needed. If we run this, we complete 1000 tasks around 100ms. Crazy right. Not quite.//// worker/pooled.go
func PooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
basic.Process(data)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.PooledWork(allData)
Start processing all work
Took ===============> 1.002972449s
Channels work like queue in Golang
This is a nice solution, if we run it, we see it takes 1s to finish all the tasks. Not quite 100ms, but we don't need it. We get a much better solution that distributes the load over time.
//// worker/pooledError.go
func PooledWorkError(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
errors := make(chan error, 1000)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
process(data, errors)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case err := <-errors:
fmt.Println("finished with error:", err.Error())
case <-time.After(time.Second * 1):
fmt.Println("Timeout: errors finished")
return
}
}
}()
defer close(errors)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
func process(data model.SimpleData, errors chan<- error) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
if data.ID % 29 == 0 {
errors <- fmt.Errorf("error on job %v", data.ID)
} else {
fmt.Printf("Finish processing %d\n", data.ID)
}
}
//// main.go
// Process
worker.PooledWorkError(allData)
We have modified our
process
method to handle some randomized errors and add it to the passed errors
channel. So, to handle errors in a concurrent model, we need errors
channel to hold the error data. After all the tasks are done processing, we check the errors
channel for errors. The error object holds ID of the task so that if needed, we can process those again.This is a better solution than the one which was not handling errors at all. But we are not done with it yet, in the second part, we will go over how we can make a dedicated worker pool package that is robust and can handle concurrent tasks with a limiting worker pool number.