并发是Go语言最强大的功能之一,掌握它是构建可扩展和高效应用程序的关键。以下是您应该了解的7种Go并发模式。

1. 工作池模式 (Worker Pool)

工作池模式涉及创建固定数量的goroutine,它们从共享队列中处理任务。此模式对于控制并发任务的数量非常有用,这对管理资源使用至关重要。

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Worker %d started job %d\n", id, job)
		time.Sleep(time.Second)
		fmt.Printf("Worker %d finished job %d\n", id, job)
		results <- job * 2
	}
}

func main() {
	const numJobs = 5
	const numWorkers = 3
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)
	var wg sync.WaitGroup

	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}

	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	wg.Wait()
	close(results)

	for result := range results {
		fmt.Println("Result:", result)
	}
}

实际场景: 处理传入 HTTP 请求的 Web 服务器,其中每个请求都由池中的工作线程处理。

2. 扇出扇入模式 (Fan-Out, Fan-In)

扇出是指启动多个 goroutine 来处理数据,扇入是指将这些 goroutine 的结果组合到单个管道中。此模式对于并行处理然后收集结果非常有用。

package main

import (
	"fmt"
	"sync"
)

func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 5; i++ {
		ch <- i
		fmt.Printf("Producer %d produced %d\n", id, i)
	}
}

func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for v := range in {
		out <- v * 2
		fmt.Printf("Consumer %d processed %d\n", id, v)
	}
}

func main() {
	numProducers := 2
	numConsumers := 2
	input := make(chan int, 10)
	output := make(chan int, 10)
	var wg sync.WaitGroup

	for i := 1; i <= numProducers; i++ {
		wg.Add(1)
		go producer(i, input, &wg)
	}
	wg.Wait()
	close(input)

	for i := 1; i <= numConsumers; i++ {
		wg.Add(1)
		go consumer(i, input, output, &wg)
	}
	wg.Wait()
	close(output)

	for result := range output {
		fmt.Println("Result:", result)
	}
}

实际场景: 数据处理管道,其中不同的处理阶段由不同的工作线程集处理。

3. 管道模式 (Pipeline)

管道模式涉及将一系列阶段链接在一起,其中每个阶段都对数据执行转换并将其传递到下一个阶段。它在数据需要经过多个顺序处理步骤的情况下很有用。

package main

import "fmt"

func stage1(nums []int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

func stage2(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * 2
		}
		close(out)
	}()
	return out
}

func stage3(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n + 1
		}
		close(out)
	}()
	return out
}

func main() {
	nums := []int{1, 2, 3, 4, 5}
	c1 := stage1(nums)
	c2 := stage2(c1)
	c3 := stage3(c2)
	for result := range c3 {
		fmt.Println(result)
	}
}

实际场景: 图像处理系统,其中图像经过多个阶段,例如调整大小、过滤和编码。

4. 发布-订阅模式 (Publish-Subscribe)

发布-订阅模式允许将消息发布给多个订阅者。此模式在不同服务需要独立响应某些事件或消息类型的系统中很有用。

package main

import (
	"fmt"
	"sync"
	"time"
)

type PubSub struct {
	mu       sync.Mutex
	channels map[string][]chan string
}

func NewPubSub() *PubSub {
	return &PubSub{
		channels: make(map[string][]chan string),
	}
}

func (ps *PubSub) Subscribe(topic string) <-chan string {
	ch := make(chan string)
	ps.mu.Lock()
	ps.channels[topic] = append(ps.channels[topic], ch)
	ps.mu.Unlock()
	return ch
}

func (ps *PubSub) Publish(topic, msg string) {
	ps.mu.Lock()
	for _, ch := range ps.channels[topic] {
		ch <- msg
	}
	ps.mu.Unlock()
}

func (ps *PubSub) Close(topic string) {
	ps.mu.Lock()
	for _, ch := range ps.channels[topic] {
		close(ch)
	}
	ps.mu.Unlock()
}

func main() {
	ps := NewPubSub()

	subscriber1 := ps.Subscribe("news")
	subscriber2 := ps.Subscribe("news")

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		for msg := range subscriber1 {
			fmt.Println("Subscriber 1 received:", msg)
		}
	}()

	go func() {
		defer wg.Done()
		for msg := range subscriber2 {
			fmt.Println("Subscriber 2 received:", msg)
		}
	}()

	ps.Publish("news", "Breaking News!")
	ps.Publish("news", "Another News!")

	time.Sleep(time.Second)
	ps.Close("news")
	wg.Wait()
}

实际场景: 消息传递系统,其中不同的服务订阅某些类型的事件或消息。

5. 带超时机制的选择模式 (Select with Timeout)

将 select 语句与超时一起使用可以避免无限阻塞。当您想要执行操作或在操作花费太长时间时中止时,此模式非常有用。

package main

import (
	"fmt"
	"time"
)

func main() {
	c := make(chan string)

	go func() {
		time.Sleep(2 * time.Second)
		c <- "result"
	}()

	select {
	case res := <-c:
		fmt.Println("Received:", res)
	case <-time.After(1 * time.Second):
		fmt.Println("Timeout")
	}
}

实际场景: 网络客户端尝试连接到服务器,如果服务器未及时响应,则停止。

6. 信号量模式 (Semaphore)

信号量限制了可以并发访问特定资源的 goroutine 的数量。此模式对于控制并发和避免资源过载非常有用。

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, sem chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()
	sem <- struct{}{} // 获取信号量
	fmt.Printf("Worker %d starting\n", id)
	time.Sleep(time.Second)
	fmt.Printf("Worker %d done\n", id)
	<-sem // 释放信号量
}

func main() {
	const numWorkers = 5
	const maxConcurrent = 2
	sem := make(chan struct{}, maxConcurrent)
	var wg sync.WaitGroup

	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, sem, &wg)
	}

	wg.Wait()
}

实际场景: 数据库连接池,其中一次只允许有限数量的连接。

7. 速率限制模式 (Rate Limiting)

速率限制使用计时器控制事件的处理速率。当您需要控制某些任务的频率时,例如对 API 的请求,此模式非常有用。

package main

import (
	"fmt"
	"time"
)

func main() {
	rate := time.Second
	ticker := time.NewTicker(rate)
	defer ticker.Stop()

	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)

	for req := range requests {
		<-ticker.C // 等待下一个滴答
		fmt.Println("Processing request", req)
	}
}

实际场景: API 网关,它限制用户在给定时间段内可以发出的请求数量。

结论

Go 中的并发模式对于构建高效且可扩展的应用程序至关重要。掌握这些模式将使您能够有效地处理并发,优化资源使用并提高应用程序的性能。