最新公告
  • 欢迎您光临码农资源网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!加入我们
  • golang框架如何利用协程实现限流和熔断?

    golang 分布式系统中,协程可用于实现限流和熔断。限流通过令牌桶算法限制并发访问量,熔断则采用断路器模式,当故障频繁时暂时停止访问。限流和熔断机制可防止系统过载或崩溃,保证系统的稳定和响应能力。

    golang框架如何利用协程实现限流和熔断?

    利用 Go 协程实现限流和熔断

    在分布式系统中,限制对资源的并发访问量和处理故障非常重要。Golang 中的协程提供了一个轻量级的并行机制,可用于轻松实现限流和熔断。

    限流

    立即学习go语言免费学习笔记(深入)”;

    限制对资源的并发访问量可以防止系统因过载而崩溃。可以使用令牌桶算法来实现限流:

    package main
    
    import (
        "context"
        "fmt"
        "runtime"
        "sync"
        "sync/atomic"
        "time"
    )
    
    func main() {
        // 令牌生成速率(令牌/秒)
        rate := 100
    
        // 令牌桶容量
        capacity := 1000
    
        // 创建令牌桶
        bucket := NewTokenBucket(rate, capacity)
    
        // 模拟并发请求
        wg := sync.WaitGroup{}
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func(requestID int) {
                defer wg.Done()
    
                if !bucket.TryAcquire() {
                    // 如果无法获取令牌,则丢弃请求
                    fmt.Println("Request", requestID, "dropped due to rate limiting")
                } else {
                    // 处理请求
                    fmt.Println("Request", requestID, "processed")
                }
            }(i)
        }
    
        wg.Wait()
    }
    
    // 令牌桶
    type TokenBucket struct {
        rate     int
        capacity int
    
        tokens      int64
        lastUpdated time.Time
        lock        sync.RWMutex
    }
    
    // NewTokenBucket 创建一个新的令牌桶
    func NewTokenBucket(rate int, capacity int) *TokenBucket {
        bucket := &TokenBucket{
            rate:     rate,
            capacity: capacity,
    
            tokens:      capacity,
            lastUpdated: time.Now(),
        }
    
        // 启动定时任务更新令牌
        go bucket.Tick()
    
        return bucket
    }
    
    // TryAcquire 尝试获取一个令牌
    func (b *TokenBucket) TryAcquire() bool {
        for {
            b.lock.Lock()
            tokens := b.tokens
    
            // 计算自上次更新以来经过的时间
            elapsed := time.Since(b.lastUpdated)
            // 根据时间更新令牌
            newTokens := b.rate * int(elapsed.Seconds())
            // 更新令牌量
            tokens += newTokens
            // 确保令牌量不超过容量
            tokens = min(tokens, b.capacity)
            // 更新最后更新时间
            b.lastUpdated = time.Now()
    
            if tokens > 0 {
                tokens--
                atomic.StoreInt64(&b.tokens, tokens)
                b.lock.Unlock()
                return true
            }
    
            b.lock.Unlock()
            return false
        }
    }
    
    // Tick 定时任务更新令牌
    func (b *TokenBucket) Tick() {
        ticker := time.NewTicker(100 * time.Millisecond)
        for {
            select {
            case <-ticker.C:
                b.TryAcquire()
            }
        }
    }
    
    func min(a, b int) int {
        if a < b {
            return a
        }
        return b
    }

    熔断

    熔断是指当资源不可用或响应速度过慢时,临时停止对该资源的访问。这可以防止不必要的请求堆积,从而导致系统崩溃。可以使用断路器模式来实现熔断:

    package main
    
    import (
        "context"
        "fmt"
        "sync/atomic"
        "time"
    )
    
    func main() {
        // 连续失败的请求次数阈值
        failureThreshold := 5
    
        // 熔断持续时间(秒)
        timeout := 30
    
        // 创建熔断器
        breaker := NewCircuitBreaker(failureThreshold, timeout)
    
        // 模拟并发请求
        wg := sync.WaitGroup{}
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func(requestID int) {
                defer wg.Done()
    
                // 尝试执行请求
                if breaker.Call(func() error {
                    // 实际的请求处理
                    return nil
                }) {
                    // 请求成功
                    fmt.Println("Request", requestID, "processed")
                } else {
                    // 请求被熔断
                    fmt.Println("Request", requestID, "dropped due to circuit breaker")
                }
            }(i)
        }
    
        wg.Wait()
    }
    
    // 熔断器
    type CircuitBreaker struct {
        failureThreshold int
        timeout          time.Duration
    
        state             atomic.Value
        lastFailureAt     atomic.Value
        failureCount      int32
        resetTimerStarted bool
    }
    
    // NewCircuitBreaker 创建一个新的熔断器
    func NewCircuitBreaker(failureThreshold int, timeout time.Duration) *CircuitBreaker {
        breaker := &CircuitBreaker{
            failureThreshold: failureThreshold,
            timeout:          timeout,
        }
    
        // 初始化熔断器状态
        breaker.SetState(Closed)
    
        return breaker
    }
    
    // SetState 设置熔断器状态
    func (b *CircuitBreaker) SetState(state State) {
        b.state.Store(state)
    }
    
    // State 获取熔断器状态
    func (b *CircuitBreaker) State() State {
        return b.state.Load()
    }
    
    // Call 执行受熔断器保护的函数
    func (b *CircuitBreaker) Call(f func() error) error {
        state := b.State()
    
        switch state {
        case Closed:
            // 熔断器已关闭,尝试执行函数
            return b.execute(f)
        case Open:
            // 熔断器已打开,直接返回错误
            return ErrCircuitOpen
        case HalfOpen:
            // 熔断器处于半开状态,尝试执行函数并更新熔断器状态
            if err := b.execute(f); err != nil {
                b.SetState(Open)
                return err
            } else {
                b.SetState(Closed)
                return nil
            }
        default:
            return ErrUnknownState
        }
    }
    
    // execute 执行函数并更新熔断器状态
    func (b *CircuitBreaker) execute(f func() error) error {
        // 记录函数调用时间
    想要了解更多内容,请持续关注码农资源网,一起探索发现编程世界的无限可能!
    本站部分资源来源于网络,仅限用于学习和研究目的,请勿用于其他用途。
    如有侵权请发送邮件至1943759704@qq.com删除

    码农资源网 » golang框架如何利用协程实现限流和熔断?
    • 7会员总数(位)
    • 25846资源总数(个)
    • 0本周发布(个)
    • 0 今日发布(个)
    • 294稳定运行(天)

    提供最优质的资源集合

    立即查看 了解详情