0. Introduction
The traditional concurrent programming model is based onThread
andSynchronous access control for shared memory
The shared data is protected by locks, and threads will compete for these locks to access the data. Generally speaking, useThread safety
The data structure will make this easier.Go
concurrent primitives (goroutine
andchannel
) provides an elegant way to build concurrent models.Go
Encouragement ingoroutine
Use betweenchannel
to pass data instead of explicitly using locks to restrict access to shared data.
Do not communicate by sharing memory; instead, share memory by communicating.
This isGo
The concurrency philosophy, which relies onCSP(Communicating Sequential Processes)
Model, it is often consideredGo
Key factors in success in concurrent programming.
Ifgoroutine
yesGo
If the concurrent body of the language program, thenchannel
It is the communication mechanism between them, the previous series of blogsgoroutine
Its scheduling mechanism is introduced. This article will introduce the communication mechanism between the two-channel
。
1. Channel data structure
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
existruntime/
middle,channel
is defined as above, where:
-
buf
: There is a cachechannel
Hold it, it is used to store cached data and collect a circular linked list; -
dataqsiz
: The maximum capacity of the above-mentioned cached data revolving list is understood ascap()
; -
qcount
: The length of the loop linked list of the above cached data is understood aslen()
; -
recvx
andsendx
: Indicates the reception or transmission location of the above cache; -
recvq
andsendq
: Receive and send respectivelygoroutine
abstract(sudog
) queue is a two-way linked list; -
lock
: Mutex lock, used to ensurechannel
Thread safety of data.
2. Channel creation
func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError("makechan: size out of range")) } return makechan(t, int(size)) } func makechan(t *chantype, size int) *hchan { elem := // compiler checks this but be safe. if >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || > maxAlign { throw("makechan: bad alignment") } mem, overflow := (, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. = () case == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) = add((c), hchanSize) default: // Elements contain pointers. c = new(hchan) = mallocgc(mem, elem, true) } = uint16() = elem = uint(size) lockInit(&, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", , "; dataqsiz=", size, "\n") } return c }
All calls will eventually goFunctions, the function does a simple thing, it is to initialize a
object, and
map
Same,channel
External is a pointing object (slices and strings are not pointing objecting objects. Take slices as an example, you can refer to them.Link). You can see:
- If the current
channel
There is no cache, then it will onlyAllocate a piece of space;
- If the current
channel
The type stored in the file is not a pointer type, then it will be the current oneAllocate a continuous memory space with the underlying continuous array;
- In other cases, then
and its cache allocate a piece of memory;
3. Data sending
// entry point for c <- x from compiled code //go:nosplit func chansend1(c *hchan, elem ) { chansend(c, elem, true, getcallerpc()) }
channel
The data sending will be calledruntime.chansend1
function, and the function is just calledFunction, this function is relatively long, let's analyze it bit by bit:
3.1 Data transmission in empty channels
func chansend(c *hchan, ep , block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } ... }
You can see if the channel isnil
, then when writing data into this channel:
- Non-blocking write will return directly (only
channel
Send+default
Branchedselect
Called during operationfunction, thus non-blocking write);
- Blocking writing (normal
ch <- v
) will passgopark
The function gives up the CPU scheduling rights and blocks thisgoroutine
;
3.2 Send directly
func chansend(c *hchan, ep , block bool, callerpc uintptr) bool { ... if != 0 { unlock(&) panic(plainError("send on closed channel")) } if sg := (); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&) }, 3) return true } ... }
Can be found whenchannel
After being closed, it will causepanic
。
If the targetchannel
There is no close, and there are already waiting for readinggoroutine
, then it will be directly fromrecvq
Take out the first one to waitgoroutine
, and passThe function sends data to it:
func send(c *hchan, sg *sudog, ep , unlockf func(), skip int) { if raceenabled { if == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. racenotify(c, , nil) racenotify(c, , sg) ++ if == { = 0 } = // = (+1) % } } if != nil { sendDirect(, sg, ep) = nil } gp := unlockf() = (sg) = true if != 0 { = cputicks() } goready(gp, skip+1) }
As you can see, the above function does two things:
- Call
sendDirect
The function copies the sent data to the address where the variable that receives the coroutine is located; - pass
goready
The function wakes up the coroutine and sets its state to_Grunnable
The next pending process after the queue placed on the processorgoroutine
;
3.3 Cache area
If there is no already read waitinggoroutine
, and createdchannel
If the cache is included and the cache is not full, the following code will be executed:
func chansend(c *hchan, ep , block bool, callerpc uintptr) bool { ... if < { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, ) if raceenabled { racenotify(c, , nil) } typedmemmove(, qp, ep) ++ if == { = 0 } ++ unlock(&) return true } ... }
Here will be passed firstThe function calculates the next location that can be stored, and then passes
Copy the sent data into the buffer and add it
sendx
Index andqcount
counter. Wait for receiving datagoroutine
can be read directly from the cache.
3.4 Blocking send
If you don't wait for readinggoroutine
, there is no cache area or the cache area is full, then the data will be blocked:
func chansend(c *hchan, ep , block bool, callerpc uintptr) bool { ... if !block { unlock(&) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() = 0 if t0 != 0 { = -1 } // No stack splits between assigning elem and enqueuing mysg // on where copystack can find it. = ep = nil = gp = false = c = mysg = nil (mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set is not safe for // stack shrinking. atomic.Store8(&, 1) gopark(chanparkcommit, (&), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != { throw("G waiting list is corrupted") } = nil = false closed := ! = nil if > 0 { blockevent(-t0, 2) } = nil releaseSudog(mysg) if closed { if == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
- Call
Get the data sent at this time
goroutine
; - Call
Get
sudog
Structure and set relevant information; - Get the previous step
sudog
Put it in the send waiting queue and call itgopark
suspend the current coroutine; - Wait for receiving data
goroutine
After arrival, wake up thisgoroutine
, then continue to go down; orclose
Got thischannel
, resulting in subsequentpanic
。
4. Receive data
4.1 Data reception in empty channels
func chanrecv(c *hchan, ep , block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } ... lock(&) if != 0 && == 0 { if raceenabled { raceacquire(()) } unlock(&) if ep != nil { typedmemclr(, ep) } return true, false } ... }
The above is part of the code when the channel is received, you can see:
- Just like sending data, if the channel is
nil
, and non-blocking reading, it will return, and it will hang after blocking reading; - What is different from when sending data is that if it is a closed channel, it is actually readable, but the data read back is
Zero value + false
。
4.2 Direct Receive
func chanrecv(c *hchan, ep , block bool) (selected, received bool) { ... if sg := (); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&) }, 3) return true, true } ... }
whenchannel
ofsendq
The queue contains waiting stategoroutine
When the earliest waiting data is retrievedgoroutine
, and then callSend:
func recv(c *hchan, sg *sudog, ep , unlockf func(), skip int) { if == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(, sg, ep) } } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp := chanbuf(c, ) if raceenabled { racenotify(c, , nil) racenotify(c, , sg) } // copy data from queue to receiver if ep != nil { typedmemmove(, ep, qp) } // copy data from sender to queue typedmemmove(, qp, ) ++ if == { = 0 } = // = (+1) % } = nil gp := unlockf() = (sg) = true if != 0 { = cputicks() } goready(gp, skip+1) }
This function will be processed separately according to whether there is a cache area:
- If the cache does not exist, call
The function will be sent directly
goroutine
The stored data is copied to the target memory address, which is equivalent to directly from thisgoroutine
fetch data from; - If a cache exists, first copy the data in the cache to the target memory address, and then
gp
Copying the data to the end of the cache area is equivalent to fetching the data from the cache queue header for receivinggoroutine
, in the waiting time to sendgoroutine
Take out the data from the cache queue and you can see that the queue must be full at this time.
In the end, no matter what the situation is, it needs to be calledgoready
wakegp
。
4.3 Get from the cache area
In fact, the description of the chapter name here is not accurate. In 4.2, there is also a situation where data is obtained from the cache area. The difference is:
- In 4.2, the cache queue is full, and there are send blocking and waiting.
goroutine
; - There is no send blocking waiting in 4.3
goroutine
。
func chanrecv(c *hchan, ep , block bool) (selected, received bool) { ... if > 0 { // Receive directly from queue qp := chanbuf(c, ) if raceenabled { racenotify(c, , nil) } if ep != nil { typedmemmove(, ep, qp) } typedmemclr(, qp) ++ if == { = 0 } -- unlock(&) return true, true } ... }
As at sending, if there is data in the cache area, then copy the data from the cache area.
4.4 Blocking reception
func chanrecv(c *hchan, ep , block bool) (selected, received bool) { ... if !block { unlock(&) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() = 0 if t0 != 0 { = -1 } // No stack splits between assigning elem and enqueuing mysg // on where copystack can find it. = ep = nil = mysg = gp = false = c = nil (mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set is not safe for // stack shrinking. atomic.Store8(&, 1) gopark(chanparkcommit, (&), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != { throw("G waiting list is corrupted") } = nil = false if > 0 { blockevent(-t0, 2) } success := = nil = nil releaseSudog(mysg) return true, success }
Similar to blocking sending, if there is no waiting for sendinggoroutine
, and there is no cache area or the cache area has no data, then you need to receive thisgoroutine
Press torecvq
andgopark
Hang, waiting for wake up.
5. Close
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&) if != 0 { unlock(&) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc((), callerpc, (closechan)) racerelease(()) } = 1 var glist gList // release all readers for { sg := () if sg == nil { break } if != nil { typedmemclr(, ) = nil } if != 0 { = cputicks() } gp := = (sg) = false if raceenabled { raceacquireg(gp, ()) } (gp) } // release all writers (they will panic) for { sg := () if sg == nil { break } = nil if != 0 { = cputicks() } gp := = (sg) = false if raceenabled { raceacquireg(gp, ()) } (gp) } unlock(&) // Ready all Gs now that we've dropped the channel lock. for !() { gp := () = 0 goready(gp, 3) } }
The code to close the channel looks very long, but after handling some special cases, it is used to all the data in the sending and receiving queues.goready
wake.
6. Summary
existGo
Although highly praisedCSP
Philosophy, recommended for everyonechannel
Implement shared memory protection, but:
Behind the scenes, channels use locks to serialize access and provide thread safety. So by synchronizing access to memory using channels, you are actually using locks. Lock in a thread-safe queue wrapped. Then, with just using the standard library
sync
Compared with the mutex in the package, what about Go's fancy locks? The following numbers are derived by continuously calling Put on their single set using Go's built-in benchmarking capabilities.
`> BenchmarkSimpleSet-8 3000000 391 ns/op`
`> BenchmarkSimpleChannelSet-8 1000000 1699 ns/o`
In my personal understanding:
- Used when transferring data
channel
; - Used when protecting memory data
;
- use
channel
andselect
The characteristics ofLinux epoll
function.
The above is the detailed explanation of the Channel in Golang concurrent programming. For more information about Golang Channel, please follow my other related articles!