Concurrency synchronizations means how to control concurrent computations:
- to avoid data races between them.
- to avoid them consuming CPU resources when they have nothing to do.
- Channel.
- Gracefully Close channels.
sync
standard package.- Atomic operations provided in the
sync/atomic
standard package.
Source:
Don't (let computations) communicate by sharing memory, (let them) share memory by communicating (through channels) - Rob Pike -
- Channels make goroutines share memory by communicating.
- A FIFO queue.
- Each channel type has an element type.
- Channel types can be bi-directional or single-directional:
chan T
denotes a bidirectional channel type (send + receive).chan<- T
denotes a send-only channel type.<-chan T
denotes a receive-only channel type.
- Each channel value has a capacity. A channel value with a zero capacity is called unbuffered channel and a channel value with a non-zero capacity is called buffered channel.
-
5 operations:
- Close the channel:
close(ch)
- Send a value:
ch <- v
- Receive a value:
<-ch
- Query tue value buffer capacity of the channel:
// return zero, if ch is nil channel. cap(ch)
- Query the current number of values in the value buffer (or the length) of the channel:
// return zero, if ch is nil channel. len(ch)
-
Most basic operations in Go are not sysnchronized.
-
The behaviors for all kind of operations applying on nil, closed and not-closed non-nil channels.
Operation | A nil channel | A closed channel | A not-closed non-nil channel |
---|---|---|---|
Close | panic | panic | succeed to close |
Send value to | block for ever | panic | block or succeed to send |
Receive value from | block for ever | never block | block or succeed to receive |
- Channel element value are transferred by copy.
- A channel cannot be garbage collected. If a goroutine is blocked and stays in either the sending or the receiving goroutine queue of a chnnel, then goroutine also cannot be be garbage collected, even if the channel is referenced only this goroutine. In fact, a goroutine can only be garbage collected when it has already exited.
for-range
on channels: the loop will try to iteratively receive the values sent to a channel, until the channel is closed and its value buffer queue becomes blank.
// if channel is nil, the loop will bolcok there forever
for v := range channel {
// use v
}
select-case
:
package main
import "fmt"
func main() {
var c chan struct{} // nil
select {
case <-c: // blocking operation
case c <- struct{}{}: // blocking operation
default:
fmt.Println("Go here.")
}
}
-
Use channels as Futures/Promises.
-
Return receive-only channels as results.
package main import ( "time" "math/rand" "fmt" ) func longTimeRequest() <-chan int32 { r := make(chan int32) go func() { // Simulate a workload. time.Sleep(time.Second * 3) r <- rand.Int31n(100) }() return r } func sumSquares(a, b int32) int32 { return a*a + b*b } func main() { rand.Seed(time.Now().UnixNano()) a, b := longTimeRequest(), longTimeRequest() // 3 seconds only fmt.Println(sumSquares(<-a, <-b)) }
-
Pass send-only channels as arguments.
package main import ( "time" "math/rand" "fmt" ) func longTimeRequest(r chan<- int32) { // Simulate a workload. time.Sleep(time.Second * 3) r <- rand.Int31n(100) } func sumSquares(a, b int32) int32 { return a*a + b*b } func main() { rand.Seed(time.Now().UnixNano()) ra, rb := make(chan int32), make(chan int32) go longTimeRequest(ra) go longTimeRequest(rb) fmt.Println(sumSquares(<-ra, <-rb)) }
- The first response wins.
package main import ( "fmt" "time" "math/rand" ) func source(c chan<- int32) { ra, rb := rand.Int31(), rand.Intn(3) + 1 // Sleep 1s/2s/3s. time.Sleep(time.Duration(rb) * time.Second) c <- ra } func main() { rand.Seed(time.Now().UnixNano()) startTime := time.Now() // c must be a buffered channel. c := make(chan int32, 5) for i := 0; i < cap(c); i++ { go source(c) } // Only the first response will be used. rnd := <- c fmt.Println(time.Since(startTime)) fmt.Println(rnd) }
-
-
Use channels for notifications.
// WIP