1. Concept
We can understand the work pool as a thread pool. Creation and destruction of thread pools consume a lot of resources, so I write a pool specifically, and put the used thread pool back into the pool instead of destroying it. However, in Go, the system's threads are not used, but goroutine is used. The creation and destruction of gorotines consume much less than the system threads, and goroutines have no labels. So the goroutine pool is no longer a time thread pool, but a work pool (work pool).
Although the system of goroutine is less consumed, you cannot use go func() when encoding at will. If the program starts goroutine frequently, it will cause extremely uncontrollable performance problems. For tasks that can be predicted in advance, the use of work pools should be considered.
The function of the work pool controls the size of the goroutine, or the number of goroutines. In Go, the best way to control the number of goroutines is to use the cache channel.
2. Examples
1. Simple example
Below is the classic usage of Go language to solve work pool.
func worker(id int, jobs <-chan int, results chan<- int) { for job := range jobs { ("worker(%d) start to do job(%d)\n", id, job) () ("worker(%d) finished job(%d)\n", id, job) results <- job } } func main() { // In order to use our work pool, we need to send and accept the results of the work, // Here we define two channels, one jobs and one results jobs := make(chan int, 100) results := make(chan int, 100) // Turn on 3 goroutines for id := 1; id <= 3; id++ { go worker(id, jobs, results) } // Create 5 tasks for job := 1; job <= 5; job++ { jobs <- job } close(jobs) // Output result for i := 1; i <= 5; i++ { <-results } }
The above code work pool idea is mainly reflected in the jobs channel. Because a channel with a cache length of 100 is defined, the new task will be blocked after the channel reaches 100. Only after the worker takes a job from the channel can the new work be continued.
This case is relatively simple. If the number of workers is large and the business execution time is long, we need to optimize the jobs and workers' patterns in programming. Each worker handles a job, and the work pool can customize the maximum number of workers; this can ensure the maximum number of goroutines, so that the program is more controllable, and avoid code consumption and crushing the system.
2. Read in the data
The following code is improved
1package main import ( "fmt" "reflect" "time" ) // Job Task Contenttype Job struct { ID int Name string } // Workertype Worker struct { id int // id WorkerPool chan chan Job // Worker pool (channel of channel), each element is a job channel, a public job JobChannel chan Job // Working channel, each element is a job, the worker private job exit chan bool // End signal} var ( MaxWorker = 5 // Maximum number of workers JobQueue = make(chan Job, 5) // Work channel, simulate the work to be processed) // Scheduler Schedulertype Scheduler struct { WorkerPool chan chan Job // Working pool WorkerMaxNum int // Maximum number of workers Workers []*Worker // worker queue} // NewScheduler Create a Schedule Centerfunc NewScheduler(workerMaxNum int) *Scheduler { workerPool := make(chan chan Job, workerMaxNum) // Working pool return &Scheduler{WorkerPool: workerPool, WorkerMaxNum: workerMaxNum} } // Start Work Pool Startsfunc (s *Scheduler) Start() { Workers := make([]*Worker, ) for i := 0; i < ; i++ { worker := NewWorker(, i) () Workers[i] = &worker } = Workers go () } // Close of Stop Working Poolfunc (s *Scheduler) Stop() { Workers := for _, w := range Workers { () } () close() } func NewWorker(WorkerPool chan chan Job, id int) Worker { ("new a worker(%d)\n", id) return Worker{ id: id, WorkerPool: WorkerPool, JobChannel: make(chan Job), exit: make(chan bool), } } // Start listens to task and end signalsfunc (w Worker) Start() { go func() { for { select { case job := <-: // Received the task ("get a job from private ") (job) case <-: // Receive the end signal ("worker exit", w) return } } }() } func (w Worker) Stop() { go func() { <- true }() } // Schedulefunc (s *Scheduler) schedule() { for { select { case job := <-JobQueue: ("get a job from JobQueue") go func(job Job) { //Get jobChannel from WorkerPool, blocking when busy jobChannel := <- ("get a private jobChannel from public ", (jobChannel)) jobChannel <- job ("worker's private jobChannel add one job") }(job) } } } func main() { scheduler := NewScheduler(MaxWorker) () jobQueue() () } // Simulate a job taskfunc jobQueue() { for i := 1; i <= 30; i++ { JobQueue <- Job{ID: i, Name: ("Job【%d】", i)} ("jobQueue add %d job\n", i) } }
Two structures are defined: Task task and Job work. Task has no substantive content, and only an integer variable is defined here;
Define two global variables: MaxWorker is the maximum number of workers; JobQueue is the channel of the Job. Both variables are used for subsequent simulations, and these two variables can be not set in real scenes.
Define a Worker structure. Unlike the previous example of a simple work pool, the Worker in this example is no longer a simple goroutine, but a structure. The following four variables are defined in the structure body. ▪id: worker number. ▪Exit: This is a bool type channel. When there is data written, the worker ends running. ▪JobChannel: Job type channel, which is a private work queue exclusive to the current worker. ▪WorkerPool: Pay attention to the definition, two Channels are used, each element is a Job channel, and in fact each element is a Job Channel.
The NewWorker method is used to create a new worker. Note that the parameter workerPool of this method is used to be passed in when creating a worker. This means that each worker is shared with the WorkerPool of other workers, or multiple workers use one WorkerPool. This is very important, and this is the optimization of this sample code based on the previous simple sample code. JobChannel and exit variables are created with the new creation of Worker.
Worker's Start method, which is used to listen to tasks or end signals. The Start method starts with goroutine to run an anonymous function, and the function is an infinite loop inside it. In the loop, the first thing is to register the current JobChannel into the WorkerPool. Once registered, it means that the worker can receive tasks. Then use select to determine whether the JobChannel can be read, that is, whether there are jobs in it, or whether the exit channel can be read. If the JobChannel is readable, it is proved that there is a Job, and the Job will be processed later; if the exit is readable, the current infinite loop will end. Therefore, pay special attention to the operation of WorkerPool in the following code, which collects work from WorkerPool. Worker's Stop method is used to write data to exit channel. In the Start method, the Worker will read the written data, and then end the infinite loop.
The NewScheduler function is used to create a Scheduler. You can see that the WorkerPool inside the function is newly created through the make function, and the NewWorker function is also passed in by parameters. Note that WorkerPool has a cache channel and the cache length is MaxWorkers.
Scheduler's Create method, which creates a Worker based on the maximum number of MaxWorkers and stores the reference into the Workers slice. After creating the Worker, immediately call the Worker's Start method and finally run the Schedule method through goroutine. Scheduler's Shutdown method is used to close the work pool, call the Stop method of all workers and close the WorkerPool work pool.
Scheduler's Schedule method is also an infinite loop inside the method. The loop is to read the JobQueue continuously and then run a goroutine. Read a JobChannel in the newly run goroutine. Note that the Worker can not be read here after registering the WorkerPool. If there is no JobChannel in the cache channel of the WorkerPool, it will block and not write the Job to it until the JobChannel is read.
Note: The content of this article comes from "Go Microservice Practical"
This is the article about the examples of the use of Golang work pool. For more related Go work pool content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!