SoFunction
Updated on 2025-03-04

Detailed explanation of Golang concurrent programming

0. Introduction

The traditional concurrent programming model is based onThreadandSynchronous access control for shared memoryThe shared data is protected by locks, and threads will compete for these locks to access the data. Generally speaking, useThread safetyThe data structure will make this easier.Goconcurrent primitives (goroutineandchannel) provides an elegant way to build concurrent models.GoEncouragement ingoroutineUse betweenchannelto pass data instead of explicitly using locks to restrict access to shared data.

Do not communicate by sharing memory; instead, share memory by communicating.

This isGoThe concurrency philosophy, which relies onCSP(Communicating Sequential Processes)Model, it is often consideredGoKey factors in success in concurrent programming.

IfgoroutineyesGoIf the concurrent body of the language program, thenchannelIt is the communication mechanism between them, the previous series of blogsgoroutineIts scheduling mechanism is introduced. This article will introduce the communication mechanism between the two-channel

1. Channel data structure

type hchan struct {
   qcount   uint           // total data in the queue
   dataqsiz uint           // size of the circular queue
   buf       // points to an array of dataqsiz elements
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

existruntime/middle,channelis defined as above, where:

  • buf: There is a cachechannelHold it, it is used to store cached data and collect a circular linked list;
  • dataqsiz: The maximum capacity of the above-mentioned cached data revolving list is understood ascap()
  • qcount: The length of the loop linked list of the above cached data is understood aslen()
  • recvxandsendx: Indicates the reception or transmission location of the above cache;
  • recvqandsendq: Receive and send respectivelygoroutineabstract(sudog) queue is a two-way linked list;
  • lock: Mutex lock, used to ensurechannelThread safety of data.

2. Channel creation

func makechan64(t *chantype, size int64) *hchan {
   if int64(int(size)) != size {
      panic(plainError("makechan: size out of range"))
   }

   return makechan(t, int(size))
}

func makechan(t *chantype, size int) *hchan {
   elem := 

   // compiler checks this but be safe.
   if  >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 ||  > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := (, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
       = ()
   case  == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       = add((c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
       = mallocgc(mem, elem, true)
   }

    = uint16()
    = elem
    = uint(size)
   lockInit(&, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", , "; dataqsiz=", size, "\n")
   }
   return c
}

All calls will eventually goFunctions, the function does a simple thing, it is to initialize aobject, andmapSame,channelExternal is a pointing object (slices and strings are not pointing objecting objects. Take slices as an example, you can refer to them.Link). You can see:

  • If the currentchannelThere is no cache, then it will onlyAllocate a piece of space;
  • If the currentchannelThe type stored in the file is not a pointer type, then it will be the current oneAllocate a continuous memory space with the underlying continuous array;
  • In other cases, thenand its cache allocate a piece of memory;

3. Data sending

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem ) {
   chansend(c, elem, true, getcallerpc())
}

channelThe data sending will be calledruntime.chansend1function, and the function is just calledFunction, this function is relatively long, let's analyze it bit by bit:

3.1 Data transmission in empty channels

func chansend(c *hchan, ep , block bool, callerpc uintptr) bool {
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   ...
}

You can see if the channel isnil, then when writing data into this channel:

  • Non-blocking write will return directly (onlychannelSend+defaultBranchedselectCalled during operationfunction, thus non-blocking write);
  • Blocking writing (normalch <- v) will passgoparkThe function gives up the CPU scheduling rights and blocks thisgoroutine

3.2 Send directly

func chansend(c *hchan, ep , block bool, callerpc uintptr) bool {
   ...

   if  != 0 {
      unlock(&)
      panic(plainError("send on closed channel"))
   }

   if sg := (); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&) }, 3)
      return true
   }

   ...
}

Can be found whenchannelAfter being closed, it will causepanic

If the targetchannelThere is no close, and there are already waiting for readinggoroutine, then it will be directly fromrecvqTake out the first one to waitgoroutine, and passThe function sends data to it:

func send(c *hchan, sg *sudog, ep , unlockf func(), skip int) {
   if raceenabled {
      if  == 0 {
         racesync(c, sg)
      } else {
         // Pretend we go through the buffer, even though
         // we copy directly. Note that we need to increment
         // the head/tail locations only when raceenabled.
         racenotify(c, , nil)
         racenotify(c, , sg)
         ++
         if  ==  {
             = 0
         }
          =  //  = (+1) % 
      }
   }
   if  != nil {
      sendDirect(, sg, ep)
       = nil
   }
   gp := 
   unlockf()
    = (sg)
    = true
   if  != 0 {
       = cputicks()
   }
   goready(gp, skip+1)
}

As you can see, the above function does two things:

  • CallsendDirectThe function copies the sent data to the address where the variable that receives the coroutine is located;
  • passgoreadyThe function wakes up the coroutine and sets its state to_GrunnableThe next pending process after the queue placed on the processorgoroutine

3.3 Cache area

If there is no already read waitinggoroutine, and createdchannelIf the cache is included and the cache is not full, the following code will be executed:

func chansend(c *hchan, ep , block bool, callerpc uintptr) bool {
   ...
   if  <  {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, )
      if raceenabled {
         racenotify(c, , nil)
      }
      typedmemmove(, qp, ep)
      ++
      if  ==  {
          = 0
      }
      ++
      unlock(&)
      return true
   }
   ...
}

Here will be passed firstThe function calculates the next location that can be stored, and then passesCopy the sent data into the buffer and add itsendxIndex andqcountcounter. Wait for receiving datagoroutinecan be read directly from the cache.

3.4 Blocking send

If you don't wait for readinggoroutine, there is no cache area or the cache area is full, then the data will be blocked:

func chansend(c *hchan, ep , block bool, callerpc uintptr) bool {
   ...

   if !block {
      unlock(&)
      return false
   }

   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
    = 0
   if t0 != 0 {
       = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on  where copystack can find it.
    = ep
    = nil
    = gp
    = false
    = c
    = mysg
    = nil
   (mysg)
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set  is not safe for
   // stack shrinking.
   atomic.Store8(&, 1)
   gopark(chanparkcommit, (&), waitReasonChanSend, traceEvGoBlockSend, 2)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   if mysg !=  {
      throw("G waiting list is corrupted")
   }
    = nil
    = false
   closed := !
    = nil
   if  > 0 {
      blockevent(-t0, 2)
   }
    = nil
   releaseSudog(mysg)
   if closed {
      if  == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   return true
}
  • CallGet the data sent at this timegoroutine
  • CallGetsudogStructure and set relevant information;
  • Get the previous stepsudogPut it in the send waiting queue and call itgoparksuspend the current coroutine;
  • Wait for receiving datagoroutineAfter arrival, wake up thisgoroutine, then continue to go down; orcloseGot thischannel, resulting in subsequentpanic

4. Receive data

4.1 Data reception in empty channels

func chanrecv(c *hchan, ep , block bool) (selected, received bool) {
   // raceenabled: don't need to check ep, as it is always on the stack
   // or is new memory allocated by reflect.

   if debugChan {
      print("chanrecv: chan=", c, "\n")
   }

   if c == nil {
      if !block {
         return
      }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   ...

   lock(&)

   if  != 0 &&  == 0 {
      if raceenabled {
         raceacquire(())
      }
      unlock(&)
      if ep != nil {
         typedmemclr(, ep)
      }
      return true, false
   }

   ...
}

The above is part of the code when the channel is received, you can see:

  • Just like sending data, if the channel isnil, and non-blocking reading, it will return, and it will hang after blocking reading;
  • What is different from when sending data is that if it is a closed channel, it is actually readable, but the data read back isZero value + false

4.2 Direct Receive

func chanrecv(c *hchan, ep , block bool) (selected, received bool) {
   ...

   if sg := (); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&) }, 3)
      return true, true
   }

   ...
}

whenchannelofsendqThe queue contains waiting stategoroutineWhen the earliest waiting data is retrievedgoroutine, and then callSend:

func recv(c *hchan, sg *sudog, ep , unlockf func(), skip int) {
   if  == 0 {
      if raceenabled {
         racesync(c, sg)
      }
      if ep != nil {
         // copy data from sender
         recvDirect(, sg, ep)
      }
   } else {
      // Queue is full. Take the item at the
      // head of the queue. Make the sender enqueue
      // its item at the tail of the queue. Since the
      // queue is full, those are both the same slot.
      qp := chanbuf(c, )
      if raceenabled {
         racenotify(c, , nil)
         racenotify(c, , sg)
      }
      // copy data from queue to receiver
      if ep != nil {
         typedmemmove(, ep, qp)
      }
      // copy data from sender to queue
      typedmemmove(, qp, )
      ++
      if  ==  {
          = 0
      }
       =  //  = (+1) % 
   }
    = nil
   gp := 
   unlockf()
    = (sg)
    = true
   if  != 0 {
       = cputicks()
   }
   goready(gp, skip+1)
}

This function will be processed separately according to whether there is a cache area:

  • If the cache does not exist, callThe function will be sent directlygoroutineThe stored data is copied to the target memory address, which is equivalent to directly from thisgoroutinefetch data from;
  • If a cache exists, first copy the data in the cache to the target memory address, and thengpCopying the data to the end of the cache area is equivalent to fetching the data from the cache queue header for receivinggoroutine, in the waiting time to sendgoroutineTake out the data from the cache queue and you can see that the queue must be full at this time.

In the end, no matter what the situation is, it needs to be calledgoreadywakegp

4.3 Get from the cache area

In fact, the description of the chapter name here is not accurate. In 4.2, there is also a situation where data is obtained from the cache area. The difference is:

  • In 4.2, the cache queue is full, and there are send blocking and waiting.goroutine
  • There is no send blocking waiting in 4.3goroutine
func chanrecv(c *hchan, ep , block bool) (selected, received bool) {
   ...

   if  > 0 {
      // Receive directly from queue
      qp := chanbuf(c, )
      if raceenabled {
         racenotify(c, , nil)
      }
      if ep != nil {
         typedmemmove(, ep, qp)
      }
      typedmemclr(, qp)
      ++
      if  ==  {
          = 0
      }
      --
      unlock(&)
      return true, true
   }

   ...
}

As at sending, if there is data in the cache area, then copy the data from the cache area.

4.4 Blocking reception

func chanrecv(c *hchan, ep , block bool) (selected, received bool) {
   ...

   if !block {
      unlock(&)
      return false, false
   }

   // no sender available: block on this channel.
   gp := getg()
   mysg := acquireSudog()
    = 0
   if t0 != 0 {
       = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on  where copystack can find it.
    = ep
    = nil
    = mysg
    = gp
    = false
    = c
    = nil
   (mysg)
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set  is not safe for
   // stack shrinking.
   atomic.Store8(&, 1)
   gopark(chanparkcommit, (&), waitReasonChanReceive, traceEvGoBlockRecv, 2)

   // someone woke us up
   if mysg !=  {
      throw("G waiting list is corrupted")
   }
    = nil
    = false
   if  > 0 {
      blockevent(-t0, 2)
   }
   success := 
    = nil
    = nil
   releaseSudog(mysg)
   return true, success
}

Similar to blocking sending, if there is no waiting for sendinggoroutine, and there is no cache area or the cache area has no data, then you need to receive thisgoroutinePress torecvqandgoparkHang, waiting for wake up.

5. Close

func closechan(c *hchan) {
   if c == nil {
      panic(plainError("close of nil channel"))
   }

   lock(&)
   if  != 0 {
      unlock(&)
      panic(plainError("close of closed channel"))
   }

   if raceenabled {
      callerpc := getcallerpc()
      racewritepc((), callerpc, (closechan))
      racerelease(())
   }

    = 1

   var glist gList

   // release all readers
   for {
      sg := ()
      if sg == nil {
         break
      }
      if  != nil {
         typedmemclr(, )
          = nil
      }
      if  != 0 {
          = cputicks()
      }
      gp := 
       = (sg)
       = false
      if raceenabled {
         raceacquireg(gp, ())
      }
      (gp)
   }

   // release all writers (they will panic)
   for {
      sg := ()
      if sg == nil {
         break
      }
       = nil
      if  != 0 {
          = cputicks()
      }
      gp := 
       = (sg)
       = false
      if raceenabled {
         raceacquireg(gp, ())
      }
      (gp)
   }
   unlock(&)

   // Ready all Gs now that we've dropped the channel lock.
   for !() {
      gp := ()
       = 0
      goready(gp, 3)
   }
}

The code to close the channel looks very long, but after handling some special cases, it is used to all the data in the sending and receiving queues.goreadywake.

6. Summary

existGoAlthough highly praisedCSPPhilosophy, recommended for everyonechannelImplement shared memory protection, but:

Behind the scenes, channels use locks to serialize access and provide thread safety. So by synchronizing access to memory using channels, you are actually using locks. Lock in a thread-safe queue wrapped. Then, with just using the standard librarysyncCompared with the mutex in the package, what about Go's fancy locks? The following numbers are derived by continuously calling Put on their single set using Go's built-in benchmarking capabilities.

`> BenchmarkSimpleSet-8 3000000 391 ns/op`
`> BenchmarkSimpleChannelSet-8 1000000 1699 ns/o`

In my personal understanding:

  • Used when transferring datachannel
  • Used when protecting memory data
  • usechannelandselectThe characteristics ofLinux epollfunction.

The above is the detailed explanation of the Channel in Golang concurrent programming. For more information about Golang Channel, please follow my other related articles!