go
channel
WaitGroup
runtime包
runtime.Gosched()
让出CPU时间片
package main
import (
"fmt"
"runtime"
)
func print(str string) {
for i := 0; i < 2; i++ {
fmt.Println(str)
}
}
func main() {
go print("go")
for i := 0; i < 2; i++ {
runtime.Gosched() // 让出CPU等待再次分配
fmt.Println("last")
}
}
运行结果
go
go
last
last
runtime.Goexit()
推出当前协程
package main
import (
"fmt"
"runtime"
"time"
)
func print(str string) {
for i := 0; i < 10; i++ {
if i >= 5 {
runtime.Goexit()
}
fmt.Println(str)
}
}
func main() {
go print("go")
time.Sleep(time.Second)
fmt.Println("last")
}
运行结果
go
go
go
go
go
last
runtime.GOMAXPROCS()
package main
import (
"fmt"
"runtime"
"time"
)
func printA(str string) {
for i := 0; i < 10; i++ {
fmt.Println(str)
}
}
func printB(str string) {
for i := 0; i < 10; i++ {
fmt.Println(str)
}
}
func main() {
fmt.Printf("cpu count %v\n", runtime.NumCPU())
// runtime.GOMAXPROCS(1)
go printA("goA")
go printB("goB")
time.Sleep(time.Second)
}
运行结果
Mutex互斥锁实现同步
除了使用channel实现同步之外,还可以使用Mutex互斥锁的方式实现同步
package main
import (
"fmt"
"sync"
"time"
)
var m int = 100
var lock sync.Mutex
var wt sync.WaitGroup
func add() {
defer wt.Done()
lock.Lock()
m += 1
time.Sleep(time.Millisecond * 10)
lock.Unlock()
}
func sub() {
defer wt.Done()
lock.Lock()
m -= 1
time.Sleep(time.Millisecond * 2)
lock.Unlock()
}
func main() {
for i := 0; i < 100; i++ {
go add()
wt.Add(1)
go sub()
wt.Add(1)
}
wt.Wait()
fmt.Printf("m: %v \n", m)
}
运行结果
100
遍历channel
方式一
package main
import (
"fmt"
)
func main() {
c := make(chan int)
go func() {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}()
for {
if data, ok := <-c; ok {
fmt.Printf("data %v \n", data)
} else {
break
}
}
}
运行结果
data 0
data 1
data 2
data 3
data 4
data 5
data 6
data 7
data 8
data 9
方法2 for range
package main
import (
"fmt"
)
func main() {
c := make(chan int)
go func() {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}()
for v := range c {
fmt.Printf("data %v \n", v)
}
}
运行结果
data 0
data 1
data 2
data 3
data 4
data 5
data 6
data 7
data 8
data 9
如果通道关闭,读多写少,channel中没有数据后,读到的就是默认值,例如 int类型读到的就是默认值0,如果没有关闭就会死锁
select switch
select时go中的一个控制结构,类似于switch语句,用于处理异步IO操作。select会监听case语句中channel的读写操作,当case中channel读写操作为非阻塞状态(能读能写)时,将会触发相应的动作
select中的语句必须是一个channel操作
select中的default子句总是可运行的
如果有多个case都可以运行,select会随机公平的选出一个执行,其他不会执行
如果没有可运行的case语句,且有default语句,那么就会执行default语句
如果没有可运行的case语句,且没有default语句,select将阻塞,直到某个case通信可以运行
eg
package main
import (
"fmt"
"time"
)
var chanInt = make(chan int)
var chanStr = make(chan string)
func main() {
go func() {
chanInt <- 100
chanStr <- "hello"
close(chanInt)
close(chanStr)
}()
for {
select {
case r := <-chanInt:
fmt.Printf("chanInt %v \n", r)
case r := <-chanStr:
fmt.Printf("chanInt %v \n", r)
default:
fmt.Printf("default \n")
}
time.Sleep(time.Second)
}
}
运行结果
default
chanInt 100
chanInt hello
chanInt
chanInt 0
chanInt
chanInt
chanInt 0
chanInt 0
chanInt 0
....
Timer
用来实现定时操作,Timer内部是用channel实现的
package main
import (
"fmt"
"time"
)
func main() {
timer1 := time.NewTimer(time.Second * 2)
t1 := time.Now()
fmt.Printf("t1 %v \n", t1)
t2 := timer1.C
fmt.Printf("t2 %v \n", t2)
fmt.Printf("==================\n")
// 与time.Sleep类似的实现
timer2 := time.NewTimer(time.Second * 2)
t1 = time.Now()
fmt.Printf("t1 %v \n", t1)
<-timer2.C
t3 := time.Now()
fmt.Printf("t2 %v \n", t3)
fmt.Printf("==================\n")
time.Sleep(time.Second * 2)
fmt.Printf("2秒后 \n")
<-time.After(time.Second * 2) // time.After函数返回值是 channel
fmt.Printf("time.After 2秒后 \n")
timer3 := time.NewTimer(time.Second)
go func() {
<-timer3.C
fmt.Printf("timer3 \n")
}()
// 打断time3定时器
stop := timer3.Stop()
if stop {
fmt.Printf("已经打断 timer3 \n")
}
fmt.Printf("before %v \n", time.Now())
timer4 := time.NewTimer(time.Second * 5) // 初始设置5s
timer4.Reset(time.Second * 1) // 重新设置为1s
<-timer4.C
fmt.Printf("after %v \n", time.Now())
}
运行结果
t1 2022-12-03 03:12:10.2425214 +0800 CST m=+0.001090001
t2 0xc000066060
==================
t1 2022-12-03 03:12:10.2473406 +0800 CST m=+0.005909201
t2 2022-12-03 03:12:12.2483163 +0800 CST m=+2.006884901
==================
2秒后
time.After 2秒后
已经打断 timer3
before 2022-12-03 03:12:16.251148 +0800 CST m=+6.009716601
after 2022-12-03 03:12:17.2520727 +0800 CST m=+7.01064130
Ticker
package main
import (
"fmt"
"time"
)
func main() {
chanInt := make(chan int)
ticker := time.NewTicker(time.Second)
go func() {
for _ = range ticker.C {
select {
case chanInt <- 1:
case chanInt <- 2:
case chanInt <- 3:
}
}
}()
sum := 0
for v := range chanInt {
fmt.Printf("接收 %v \n", v)
sum += v
fmt.Printf("sum %v \n", sum)
break
}
}
运行结果
接收 3
sum 3
原子变量
并发情况下共享变量会存在线程安全问题,加锁可以解决线程安全问题但是效率较低,可以使用原子变量解决该问题
package main
import (
"fmt"
"sync/atomic"
"time"
)
var i int32 = 100
func add() {
atomic.AddInt32(&i, 1)
}
func sub() {
atomic.AddInt32(&i, -1)
}
func main() {
for i := 0; i < 100; i++ {
go add()
go sub()
}
time.Sleep(time.Second)
fmt.Println(i)
}
运行结果
100
常用的原子变量操作
- 增减
- 载入
- 比较交换cas
- 交换
- 存储
增减
func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
载入
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
比较交换
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
交换
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
存储
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
评论区