在分布式系统和数据库的交互中,并发问题如同暗流般潜伏,稍有不慎就会掀起应用的惊涛骇浪。试想一下,我们正在构建一个股票交易平台,允许不同用户同时购买公司股票。每个公司都有一定数量的可用股票,用户只能在剩余股票充足的情况下进行购买。

Golang 与 Redis 的解决方案:构建稳固的交易系统

为了解决这个问题,我们可以借助 Golang 和 Redis 的强大功能,构建一个安全可靠的交易系统。

数据层搭建:GoRedis 助力高效交互

首先,我们使用 goredis 客户端库创建一个数据层(Repository),用于与 Redis 数据库进行交互:

type Repository struct {
 client *redis.Client
}

var _ go_redis_concurrency.Repository = (*Repository)(nil)

func NewRepository(address, password string) Repository {
 return Repository{
  client: redis.NewClient(&redis.Options{
   Addr:     address,
   Password: password,
  }),
 }
}

购买股票功能实现:并发问题初现端倪

接下来,我们实现 BuyShares 函数,模拟用户购买股票的操作:

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
 defer wg.Done()

 companySharesKey := BuildCompanySharesKey(companyId)

 // --- (1) ----
 // 获取当前可用股票数量
 currentShares, err := r.client.Get(ctx, companySharesKey).Int()
 if err != nil {
  fmt.Print(err.Error())
  return err
 }

 // --- (2) ----
 // 验证剩余股票是否充足
 if currentShares < numShares {
  fmt.Print("error: 公司剩余股票不足\n")
  return errors.New("error: 公司剩余股票不足")
 }
 currentShares -= numShares

 // --- (3) ----
 // 更新公司可用股票数量
 _, err = r.client.Set(ctx, companySharesKey, currentShares, 0).Result()
 return err
}

该函数包含三个步骤:

  1. 获取公司当前可用股票数量。
  2. 验证剩余股票是否足以满足用户购买需求。
  3. 更新公司可用股票数量。

看似逻辑清晰,但当多个用户并发执行 BuyShares 函数时,问题就出现了。

模拟并发场景:问题暴露无遗

为了模拟并发场景,我们创建多个 Goroutine 同时执行 BuyShares 函数:

const (
 total_clients = 30
)

func main() {
 // --- (1) ----
 // 初始化 Repository
 repository := redis.NewRepository(fmt.Sprintf("%s:%d", config.Redis.Host, config.Redis.Port), config.Redis.Pass)

 // --- (2) ----
 // 并发执行 BuyShares 函数
 companyId := "TestCompanySL"
 var wg sync.WaitGroup
 wg.Add(total_clients)

 for idx := 1; idx <= total_clients; idx++ {
  userId := fmt.Sprintf("user%d", idx)
  go repository.BuyShares(context.Background(), userId, companyId, 100, &wg)
 }
 wg.Wait()

 // --- (3) ----
 // 获取公司剩余股票数量
 shares, err := repository.GetCompanyShares(context.Background(), companyId)
 if err != nil {
  panic(err)
 }
 fmt.Printf("公司 %s 剩余股票数量: %d\n", companyId, shares)
}

假设公司 TestCompanySL 初始拥有 1000 股可用股票,每个用户购买 100 股。我们期望的结果是,只有 10 个用户能够成功购买股票,剩余用户会因为股票不足而收到错误信息。

然而,实际运行结果却出乎意料,公司剩余股票数量可能出现负数,这意味着多个用户在读取可用股票数量时,获取到的是同一个未更新的值,导致最终结果出现偏差。

Redis 并发解决方案:精准打击,逐个击破

为了解决上述并发问题,Redis 提供了多种解决方案,让我们来一一剖析。

原子操作:简单场景下的利器

原子操作能够在不加锁的情况下,保证对数据的修改操作具有原子性。在 Redis 中,可以使用 INCRBY 命令对指定 key 的值进行原子递增或递减。

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
 defer wg.Done()

 // ... (省略部分代码) ...

 // 使用 INCRBY 命令原子更新股票数量
 _, err = r.client.IncrBy(ctx, companySharesKey, int64(-numShares)).Result()
 return err
}

然而,在我们的股票交易场景中,原子操作并不能完全解决问题。因为在更新股票数量之前,还需要进行剩余股票数量的验证。如果多个用户同时读取到相同的可用股票数量,即使使用原子操作更新,最终结果仍然可能出现错误。

事务:保证操作的原子性

Redis 事务可以将多个命令打包成一个原子操作,要么全部执行成功,要么全部回滚。通过 MULTIEXECDISCARDWATCH 命令,可以实现对数据的原子性操作。

  • MULTI:标记事务块的开始。
  • EXEC:执行事务块中的所有命令。
  • DISCARD:取消事务块,放弃执行所有命令。
  • WATCH:监视指定的 key,如果 key 在事务执行之前被修改,则事务执行失败。

在我们的例子中,可以使用 WATCH 命令监视公司可用股票数量的 key。如果 key 在事务执行之前被修改,则说明有其他用户并发修改了数据,当前事务执行失败,从而保证数据的一致性。

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
 defer wg.Done()

 companySharesKey := BuildCompanySharesKey(companyId)

 // 使用事务保证操作的原子性
 tx := r.client.TxPipeline()
 tx.Watch(ctx, companySharesKey)

 // ... (省略部分代码) ...

 _, err = tx.Exec(ctx).Result()
 return err
}

然而,在高并发场景下,使用事务可能会导致大量事务执行失败,影响系统性能。

LUA 脚本:将逻辑移至 Redis 服务端执行

为了避免上述问题,可以借助 Redis 的 LUA 脚本功能,将业务逻辑移至 Redis 服务端执行。LUA 脚本在 Redis 中以原子方式执行,可以有效避免并发问题。

local sharesKey = KEYS[1]
local requestedShares = ARGV[1]

local currentShares = redis.call("GET", sharesKey)
if currentShares < requestedShares then
 return {err = "error: 公司剩余股票不足"}
end

currentShares = currentShares - requestedShares
redis.call("SET", sharesKey, currentShares)

该 LUA 脚本实现了与 BuyShares 函数相同的逻辑,包括获取可用股票数量、验证剩余股票是否充足以及更新股票数量。

在 Golang 中,可以使用 goredis 库执行 LUA 脚本:

var BuyShares = redis.NewScript(`
 local sharesKey = KEYS[1]
 local requestedShares = ARGV[1]

 local currentShares = redis.call("GET", sharesKey)
 if currentShares < requestedShares then
  return {err = "error: 公司剩余股票不足"}
 end

 currentShares = currentShares - requestedShares
 redis.call("SET", sharesKey, currentShares)
`)

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
 defer wg.Done()

 keys := []string{BuildCompanySharesKey(companyId)}
 err := BuyShares.Run(ctx, r.client, keys, numShares).Err()
 if err != nil {
  fmt.Println(err.Error())
 }
 return err
}

使用 LUA 脚本可以有效解决并发问题,并且性能优于事务机制。

分布式锁:灵活控制并发访问

除了 LUA 脚本,还可以使用分布式锁来控制对共享资源的并发访问。Redis 提供了 SETNX 命令,可以实现简单的分布式锁机制。

在 Golang 中,可以使用 redigo 库的 Lock 函数获取分布式锁:

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
 defer wg.Done()

 companySharesKey := BuildCompanySharesKey(companyId)

 // 获取分布式锁
 lockKey := "lock:" + companySharesKey
 lock, err := r.client.Lock(ctx, lockKey, redislock.Options{
  RetryStrategy: redislock.ExponentialBackoff{
   InitialDuration: time.Millisecond * 100,
   MaxDuration:     time.Second * 3,
  },
 })
 if err != nil {
  return fmt.Errorf("获取分布式锁失败: %w", err)
 }
 defer lock.Unlock(ctx)

 // ... (省略部分代码) ...

 return nil
}

使用分布式锁可以灵活控制并发访问,但需要谨慎处理锁的释放和超时问题,避免出现死锁情况。

总结

Redis 提供了多种解决并发问题的方案,包括原子操作、事务、LUA 脚本和分布式锁等。在实际应用中,需要根据具体场景选择合适的方案。

  • 原子操作适用于简单场景,例如计数器等。
  • 事务可以保证多个操作的原子性,但性能较低。
  • LUA 脚本可以将业务逻辑移至 Redis 服务端执行,性能较高,但需要熟悉 LUA 语法。
  • 分布式锁可以灵活控制并发访问,但需要谨慎处理锁的释放和超时问题。

希望本文能够帮助你更好地理解和解决 Redis 并发问题,构建更加稳定可靠的分布式系统。