最新公告
  • 欢迎您光临码农资源网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!加入我们
  • 使用通道更快地关闭 goroutine

    使用通道更快地关闭 goroutine

    问题内容

    我是 GO 新手,我有一个关于使用通道信号停止 goroutine 的问题。

    我有一个长期运行的 goroutine(超过 1000 个)和管理器来管理它:

    func myThreadFunc(stop chan bool) {
        for {
            select {
            case <- stop:
                log.Debug("Stopping thread")
                return
            default:
                callClientTask() 
            }
        }
    }
    
    func callClientTask() {
        // This can take long time up to 30 seconds - this is external HTTP API call
        time.Sleep(5 * time.Second)
    }
    
    
    func manager() {
        var cancelChannelSlice []chan bool
        for i := 0; i < 1000; i++ {
            cancelChannel := make(chan bool)
            cancelChannelSlice = append(cancelChannelSlice, cancelChannel)
    
            go myThreadFunc(cancelChannel)
        }
    
        var stopTest = func() {
            for _, c := range cancelChannelSlice {
                c <- true
            }
        }
    
        timeout := time.After(time.Duration(300) * time.Second)
        for {
            select {
            case <-timeout:
                stopTest()
            default:
                time.Sleep(time.Second)
            }
        }
    }

    在这种情况下,每次我调用 c <- true 管理器都会等待 callClientTask() 完成,然后转到下一个 cancelChannel
    我希望所有 goroutine 在 callClientTask() 的 1 次迭代中停止(不超过 30 秒)

    我尝试的唯一方法是像这样投射新的 goroutine:

    var stopTest = func() {
            for _, c := range cancelChannelSlice {
                go func(c chan bool) {
                    c <- true
                    close(c)
                }(c)
            }
        }

    我这是正确的方法吗?

    正确答案

    据我从您的问题中了解到,“您希望所有 goroutine 在 callClientTask() 的 1 次迭代中停止(不超过 30 秒)”并且工作线程同时运行而不会出现同步问题。

    我重新组织了与等待组同时运行的代码。

    示例代码:

    package main
    
    import (
        "log"
        "sync"
        "time"
    )
    
    func worker(stop <-chan struct{}, wg *sync.WaitGroup) {
        defer wg.Done()
    
        for {
            select {
            case <-stop:
                log.Println("Stopping thread")
                return
            default:
                callClientTask()
            }
        }
    }
    
    func callClientTask() {
        time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
    }
    
    func main() {
        var wg sync.WaitGroup
        stop := make(chan struct{})
    
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go worker(stop, &wg)
        }
    
        time.Sleep(5 * time.Second) // allow workers to run for a while
        close(stop)                 // stop all workers, close channel
        wg.Wait()                   // wait for all workers
    }

    输出:

    2023/10/26 10:40:44 Stopping thread
    2023/10/26 10:40:44 Stopping thread
    ....
    2023/10/26 10:40:49 Stopping thread
    2023/10/26 10:40:49 Stopping thread

    编辑:

    如果您想停止某些工作人员,则必须更新工作人员。以下代码包括具有“停止”和“停止”通道的工作人员以及启动/停止功能。

    示例代码:

    package main
    
    import (
        "log"
        "sync"
        "time"
    )
    
    type Worker struct {
        stop    chan struct{}
        stopped chan struct{}
    }
    
    func NewWorker() *Worker {
        return &Worker{
            stop:    make(chan struct{}),
            stopped: make(chan struct{}),
        }
    }
    
    func (w *Worker) Start(wg *sync.WaitGroup) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case <-w.stop:
                    log.Println("Stopping thread")
                    close(w.stopped)
                    return
                default:
                    callClientTask()
                }
            }
        }()
    }
    
    func (w *Worker) Stop() {
        close(w.stop)
        <-w.stopped
    }
    
    func callClientTask() {
        time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
    }
    
    func main() {
        var wg sync.WaitGroup
        workers := make([]*Worker, 1000)
    
        for i := 0; i < 1000; i++ {
            workers[i] = NewWorker()
            workers[i].Start(&wg)
        }
    
        time.Sleep(5 * time.Second) // allow workers to run for a while 
        for i := 0; i < 100; i++ { // stop  first 100 workers
            workers[i].Stop()
        }  
        for i := 100; i < 1000; i++ { // wait other workers to finish
            workers[i].Stop()
        }
        wg.Wait()
    }

    输出:

    2023/10/26 12:51:26 Stopping thread
    2023/10/26 12:51:28 Stopping thread
    2023/10/26 12:51:30 Stopping thread
    ....
    想要了解更多内容,请持续关注码农资源网,一起探索发现编程世界的无限可能!
    本站部分资源来源于网络,仅限用于学习和研究目的,请勿用于其他用途。
    如有侵权请发送邮件至1943759704@qq.com删除

    码农资源网 » 使用通道更快地关闭 goroutine
    • 7会员总数(位)
    • 25846资源总数(个)
    • 0本周发布(个)
    • 0 今日发布(个)
    • 293稳定运行(天)

    提供最优质的资源集合

    立即查看 了解详情