协程和管道
- 1、协程
- 1.1、进程、线程和协程
- 1.2、goroutine的使用以及sync.WaitGroup
- 1.3、启动多个协程
- 1.4、设置Golang并行运行的时候占用的cup数量
- 1.5、goroutine统计素数
- 2、管道
- 2.1、管道的操作
- 2.2、协程和管道协同
- 2.3、单向管道
- 2.4、多路复用之select
- 2.5、解决协程中出现的异常问题
- 3、Golang协程同步与互斥
- 3.1、互斥锁
- 3.2、读写锁
- 3.3、条件变量
1、协程
1.1、进程、线程和协程
进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空间。一个进程至少有 5 种基本状态,它们是:初始态,执行态,等待状态,就绪状态,终止状态。
线程是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位
并发:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执行。
并行:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。
通俗的讲多线程程序在单核CPU上面运行就是并发,多线程程序在多核CUP上运行就是并行,如果线程数大于CPU核数,则多线程程序在多个CPU上面运行既有并行又有并发。
Golang中的协程:
Golang中的主线程:(可以理解为线程/也可以理解为进程),在一个Golang程序的主线程上可以起多个协程。Golang 中多协程可以实现并行或者并发。
协程:可以理解为用户级线程,这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的。Golang的一大特色就是从语言层面原生支持协程,在函数或者方法前面加go关键字就可创建一个协程。可以说Golang中的协程就是goroutine 。
Golang 中的多协程有点类似其他语言中的多线程。
多协程和多线程:Golang 中每个goroutine (协程) 默认占用内存远比Java 、C的线程少。
OS线程(操作系统线程)一般都有固定的栈内存(通常为 2MB 左右),一个goroutine (协程)占用内存非常小,只有 2KB 左右,多协程 goroutine切换调度开销方面远比线程要少。
1.2、goroutine的使用以及sync.WaitGroup
下面实现创建一个协程,在协程和主线程中分别执行打印语句,每次休眠一秒。
package mainimport ("fmt""time"
)func test() {for i := 0; i < 3; i++ {fmt.Println("test...")time.Sleep(time.Second)}
}func main() {go test()for i := 0; i < 3; i++ {fmt.Println("main...")time.Sleep(time.Second)}
}
但是有个问题,如果主线程执行的速度比较快呢,我们可以修改一下代码,让主线程跑快一些。
此时我们发现主线程执行完后,协程不会再继续执行了。这是因为主线程执行完后整个程序就退出了。
所以我们需要使用sync.WaitGroup来让主线程等待协程。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc test() {for i := 0; i < 3; i++ {fmt.Println("test...")time.Sleep(time.Second)}wg.Done()
}func main() {wg.Add(1)go test()for i := 0; i < 3; i++ {fmt.Println("main...")time.Sleep(100)}wg.Wait()
}
可以看到此时主线程执行完后会等待协程执行完,然后才会退出。有点类似于创建进程/线程并进行进程/线程等待回收。
其中:sync.WaitGroup本质上是一个计数器,Add方法表示增加计数器,Done表示让计数器减1,Wait表示等待协程执行完毕。
1.3、启动多个协程
package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc test(id int) {for i := 1; i <= 3; i++ {fmt.Printf("我是协程[%v]..., i=%d\n", id, i)}wg.Done()
}func main() {for i := 1; i <= 5; i++ {wg.Add(1)go test(i)}wg.Wait()
}
1.4、设置Golang并行运行的时候占用的cup数量
可以使用runtime.NumCPU()来获取当前计算机上CPU核心的数量。
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个 OS 线程来同时执行Go代码。默认值是机器上的 CPU 核心数。 例如在一个 8 核心的机器上,调度器会把 Go 代码同时调度到8个OS线程上。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
package mainimport ("fmt""runtime"
)func main() {num := runtime.NumCPU()fmt.Println("CPU数量为:", num)runtime.GOMAXPROCS(num - 1)
}
1.5、goroutine统计素数
现在假设要统计1->50000000中有多少素数,最普遍的做法是使用一个for循环来做,如下:
package mainimport ("fmt""time"
)func main() {u1 := time.Now().Unix()// var cnt = 0for i := 2; i <= 50000000; i++ {flag := truefor j := 2; j*j <= i; j++ {if i%j == 0 {flag = falsebreak}}if flag {// cnt++}}// fmt.Println("共有素数:", cnt)u2 := time.Now().Unix()fmt.Println("花费时间:", u2-u1)
}
我们发现运行时间高达12S。下面我们使用goroutine试试看:
我们创建5个协程来完成,每个协程处理一千万个数据。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc test(x int) {for i := (x-1)*10000000 + 1; i <= x*10000000; i++ {if i == 1 {continue}flag := truefor j := 2; j*j <= i; j++ {if i%j == 0 {flag = falsebreak}}if flag {}}wg.Done()
}func main() {u1 := time.Now().Unix()for i := 1; i <= 5; i++ {wg.Add(1)go test(i)}wg.Wait()u2 := time.Now().Unix()fmt.Println("花费时间:", u2-u1)
}
可以看到这里我们花费时间大大的降低了,那如果我们想实现几个协程判断素数,其中一个协程进行打印呢?就需要使用到下面的管道了。
2、管道
管道是Golang在语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Golang的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
Go语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
2.1、管道的操作
1、channel类型。
2、创建管道需要使用make函数。
3、channel操作。
管道有发送(send)、接收(receive)和关闭(close)三种操作。其中发送和接收都需要使用<-符号,例子如下:
package mainimport "fmt"func main() {// 1.创建管道ch := make(chan int, 3)// 2.给管道发送数据ch <- 10ch <- 20ch <- 30// 3.从管道获取数据a := <-chfmt.Println(a)fmt.Printf("值: %v, 类型: %T, 长度: %d, 容量: %d\n", ch, ch, len(ch), cap(ch))
}
管道有点类似队列的结构特点,所以获取的a值为10。另外打印管道返回的是一个地址,容量为3,由于我们取出了一个数据,所以长度为2。
4、管道是引用数据类型。
package mainimport "fmt"func main() {ch := make(chan int, 3)ch <- 10ch <- 20ch2 := chch2 <- 30<-ch<-chfmt.Println(<-ch2)
}
5、管道阻塞
当管道中没有数据,再去取就会阻塞。当管道中数据写满了,再去取也会阻塞。
package mainfunc main() {ch := make(chan int, 3)ch <- 10ch <- 20ch <- 30ch <- 40
}
package mainimport "fmt"func main() {ch := make(chan int, 3)ch <- 10ch <- 20<-ch<-chnum := <-chfmt.Println(num)
}
6、for range遍历管道
package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}for v := range ch {fmt.Println(v)}
}
注意:for range遍历管道只有一个返回值,并且会报错。
解决办法:调用close函数关闭管道,这样for range遍历结束就会退出,不会报错。
package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}close(ch)for v := range ch {fmt.Println(v)}
}
另外还可以直接使用for循环遍历,不过需要知道管道中的元素个数。
package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}for i := 0; i < len(ch); i++ {fmt.Println(<-ch)}
}
2.2、协程和管道协同
7、需求:使用goroutine和channel协同工作。
- 开启一个协程向管道中写入数据。
- 开启一个协程丛管道中读取数据。
- 主线程必须等协程操作完才能退出。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("协程[1]向管道中写入数据:", i)time.Sleep(500)}close(ch)wg.Done()
}func fn2(ch chan int) {for i := 1; i <= 5; i++ {x := <-chfmt.Println("协程[2]从管道中读取数据:", x)time.Sleep(500)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(2)go fn1(ch)go fn2(ch)wg.Wait()
}
这里先读取是因为协程1已经把数据写入了,只不过还没打印出来就被协程2取走并打印输出了。管道是自带同步和互斥机制的,所以哪怕让协程2休眠时间远短于协程1,协程2也会阻塞住等待。
再来看另外一个例子,使用go关键字配合匿名自执行函数创建协程。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc main() {var ch = make(chan int, 3)wg.Add(1)go func() {for i := 1; i <= 3; i++ {num := <-chfmt.Println(num)}wg.Done()}()wg.Add(1)go func() {for i := 1; i <= 3; i++ {time.Sleep(time.Second)ch <- i}wg.Done()}()wg.Wait()
}
这里有点类似于C++中通过lambda表达式创建线程执行。
需求:改善上面实现的素数判断,还是创建多个协程来判断素数,但是我们还要创建一个协程来打印素数,这就需要实现协程间通信,所以就需要使用协程+管道。
- 创建一个管道intChain和一个协程,这个协程负责写入需要判断的值,然后判断素数的协程从管道intChain中获取数据进行判断。
- 创建16个协程和一个管道primeChain,这16个协程从上面的管道intChain中获取数据进行判断,如果是素数就写入到新创建的管道primeChain中。
- 创建一个打印素数的协程,该协程从存放素数的管道primeChain中获取数据打印输出。
- 主线程进行等待
但是还要注意,我们打印素数协程是使用for range遍历管道的,所以需要close管道,而我们不能在执行方法中随意close管道,因为可能其他协程还要写入,所以还需要一个exitChain来标识,当判断素数的协程执行完就向exitChain写入true。然后我们另外创建一个协程来读取exitChain,当十六次全部读取完毕就可以关闭primeChain,这样for range就不会出错了。
实现代码如下:
package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc putNum(intChain chan int) {for i := 2; i <= 100; i++ {intChain <- i}close(intChain)wg.Done()
}func isPrime(intChain chan int, primeChain chan int, exitChain chan bool) {for v := range intChain {flag := truefor j := 2; j*j <= v; j++ {if v%j == 0 {flag = falsebreak}}if flag {primeChain <- v}}exitChain <- truewg.Done()
}func printPrime(primeChain chan int) {for v := range primeChain {fmt.Printf("%v是素数\n", v)}wg.Done()
}func main() {intChan := make(chan int, 1000)primeChan := make(chan int, 1000)exitChan := make(chan bool, 16)wg.Add(1)go putNum(intChan)for i := 0; i < 16; i++ {wg.Add(1)go isPrime(intChan, primeChan, exitChan)}wg.Add(1)go printPrime(primeChan)wg.Add(1)go func() {for i := 0; i < 16; i++ {<-exitChan}close(primeChan)wg.Done()}()wg.Wait()fmt.Println("执行完毕...")
}
2.3、单向管道
有的时候我们会将管道作为参数在多个任务函数间传递, 很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或只能接收。
单向管道的实现如下,在chan左边或右边添加<-。
// 声明为只写
var chan1 chan<- int
chan1 = make(chan int, 3)// 声明为只读
var chan2 <-chan int
chan2 = make(chan int, 3)
举个例子,创建两个协程实现一个协程向管道中写入数据,另一个协程从管道中读取数据。按之前的写法如下:
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("协程[1]向管道中写入:", i)time.Sleep(time.Millisecond * 100)}close(ch)wg.Done()
}func fn2(ch chan int) {for v := range ch {fmt.Println("协程[2]从管道中读取:", v)time.Sleep(time.Millisecond * 100)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(1)go fn1(ch)wg.Add(1)go fn2(ch)wg.Wait()
}
这么写其实也没有什么问题,但是我们可以在函数参数上进一步限制管道,对于fn1来说,该管道只进行写入,对于fn2来说,该管道只进行读取,所以可以修改成下面的代码:
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan<- int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("协程[1]向管道中写入:", i)time.Sleep(time.Millisecond * 100)}close(ch)wg.Done()
}func fn2(ch <-chan int) {for v := range ch {fmt.Println("协程[2]从管道中读取:", v)time.Sleep(time.Millisecond * 100)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(1)go fn1(ch)wg.Add(1)go fn2(ch)wg.Wait()
}
2.4、多路复用之select
在某些场景下我们需要同时从多个管道中读取数据,这时候就可以使用多路复用技术。多路复用本质上是一种就绪事件的通知机制。
select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个管道的通信(接收或发送) 过程。select会一直等待,直到底层事件就绪时, 就会执行case分支对应的语句。 具体格式如下:
当读取完所有数据后就会走default。
例如下面读取两个管道中的数据,可以创建两个协程来读取,也可以使用多路复用。
package mainimport "fmt"func main() {intChan := make(chan int, 10)for i := 1; i <= 10; i++ {intChan <- i}strChan := make(chan string, 5)for i := 1; i <= 5; i++ {strChan <- fmt.Sprintf("hello-%d", i)}for {select {case v := <-intChan:fmt.Println("从intChan中获取数据:", v)case v := <-strChan:fmt.Println("从strChan中获取数据:", v)default:fmt.Println("数据获取完毕...")return}}
}
注意:
1、走到default表示管道中的数据都获取完毕了,由于外层是for死循环,所以需要return退出。
2、使用select来获取管道中的数据,不需要close管道。
2.5、解决协程中出现的异常问题
package mainimport ("fmt""time"
)func print() {for i := 0; i < 5; i++ {fmt.Println("hello...")}
}func test() {var m map[string]stringm["username"] = "张三"
}func main() {go print()go test()time.Sleep(time.Second)
}
在上面的test中,由于我们只是声明了m,没有使用make函数来创建空间,所以该协程出现异常导致整个程序崩溃,类似于C/C++中线程出现异常导致整个进程崩溃。
所以我们可以使用defer + recover来解决。
package mainimport ("fmt""time"
)func print() {for i := 0; i < 5; i++ {fmt.Println("hello...")}
}func test() {defer func() {err := recover()if err != nil {fmt.Println("err:", err)}}()var m map[string]stringm["username"] = "张三"
}func main() {go print()go test()time.Sleep(time.Second)
}
3、Golang协程同步与互斥
3.1、互斥锁
多协程访问共享资源不加以保护就会出问题,下面用多协程模拟一轮抢票。
package mainimport ("fmt""sync""time"
)var ticket = 10000
var wg sync.WaitGroupfunc GetTicket(i int) {for {if ticket > 0 {time.Sleep(time.Microsecond * 1000)fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)ticket--} else {break}}wg.Done()
}func main() {for i := 1; i <= 4; i++ {wg.Add(1)go GetTicket(i)}wg.Wait()
}
多协程共享全局变量,在进行抢票的时候我们发现多个协程竟然抢到同一张票,所以我们需要加锁保护。Golang中的互斥量使用很简单,只需要在全局定义一个sync.Mutex对象,调用其中的Lock和Unlock方法即可。
package mainimport ("fmt""sync""time"
)var ticket = 10000
var wg sync.WaitGroup
var mutex sync.Mutexfunc GetTicket(i int) {for {mutex.Lock()if ticket > 0 {time.Sleep(time.Microsecond * 1000)fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)ticket--} else {mutex.Unlock()break}mutex.Unlock()}wg.Done()
}func main() {for i := 1; i <= 4; i++ {wg.Add(1)go GetTicket(i)}wg.Wait()
}
3.2、读写锁
读写锁保证任何时刻只有读者或者只有写者,如果是写者只能有一个写者,如果是读者可以有多个读者。使用如下:
var rwMtx sync.RWMutex // 定义读写锁
rwMtx.Lock() //写者加锁
rwMtx.Unlock() //写者解锁
rwMtx.RLock() // 读者加锁
rwMtx.RUnlock() // 读者解锁
下面创建一个协程协程写入数据,另一批协程读取数据。
package mainimport ("fmt""sync"
)var wg sync.WaitGroup
var rwMtx sync.RWMutexfunc read() {rwMtx.RLock()fmt.Println("协程读取数据...")rwMtx.RUnlock()wg.Done()
}func write() {rwMtx.Lock()fmt.Println("协程写入数据...")rwMtx.Unlock()wg.Done()
}func main() {for i := 0; i < 10; i++ {wg.Add(1)go write()}for i := 0; i < 10; i++ {wg.Add(1)go read()}wg.Wait()
}
3.3、条件变量
条件变量是用来实现协程同步和互斥的。使用如下:
var mutex sync.Mutex
var cond = sync.NewCond(&mutex) // 传入锁初始化条件变量
cond.Wait() // 等待条件变量
cond.Signal() // 唤醒一个协程
cond.Broadcast() // 唤醒所有协程
cond.L.Lock() // 加锁,本质使用的是传入的mutex锁
cond.L.Unlock() // 解锁,本质使用的是传入的mutex锁
加锁可以直接使用条件变量提供的方法加锁,也可以使用我们定义的锁来加锁,但是要保证是同一把锁。
下面使用条件变量实现协程同步和互斥,需求:两个协程交替打印奇数和偶数。
package mainimport ("fmt""sync""time"
)var mutex sync.Mutex
var cond = sync.NewCond(&mutex)
var wg sync.WaitGroup
var x = 1
var flag = truefunc fn1() {for {cond.L.Lock()for !flag {cond.Wait()}fmt.Println("协程[1]打印数据:", x)x++flag = falsetime.Sleep(time.Second)cond.Signal()cond.L.Unlock()}wg.Done()
}func fn2() {for {cond.L.Lock()for flag {cond.Wait()}fmt.Println("协程[2]打印数据:", x)x++flag = truetime.Sleep(time.Second)cond.Signal()cond.L.Unlock()}wg.Done()
}func main() {wg.Add(1)go fn1()wg.Add(1)go fn2()wg.Wait()
}
由于管道自带同步互斥保护机制,所以也可以使用协程+管道来实现。
package mainimport ("fmt""sync""time"
)var x = 1
var wg sync.WaitGroupfunc fn1(ch1 <-chan bool, ch2 chan<- bool) {for {<-ch1fmt.Println("协程[1]打印数据:", x)time.Sleep(time.Second)x++ch2 <- true}wg.Done()
}func fn2(ch1 chan<- bool, ch2 <-chan bool) {for {<-ch2fmt.Println("协程[2]打印数据:", x)time.Sleep(time.Second)x++ch1 <- true}wg.Done()
}func main() {var ch1 = make(chan bool, 1)var ch2 = make(chan bool, 1)ch1 <- truewg.Add(1)go fn1(ch1, ch2)wg.Add(2)go fn2(ch1, ch2)wg.Wait()
}