Home > Mobile >  Channel won't receive message sent from inside Go Routine
Channel won't receive message sent from inside Go Routine

Time:01-10

I'm trying to get a better understanding of channel in GO.

Want 5 routines to be running at all times. At specific times during the first routine, I want to try to start another routine. Assuming 5 routines are already running, I want to queue up the next routine and run it as soon as one of the other routines has been completed.

My logic was call pass a message to spawner, check to see if there are 5 processes already running, if so, keep waiting until there isn't, and start up. From what I can tell, is is p.complete <- struct{}{} isn't working as expecting and removing a process. It works fine outside of the go routine.

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

type ProcessManager struct {
    spawner  chan struct{}
    complete chan struct{}
    process  int
}

func NewProcessManager() *ProcessManager {
    return &ProcessManager{
        spawner:  make(chan struct{}),
        complete: make(chan struct{}),
        process:  0,
    }
}

func (p *ProcessManager) Run(limit int) {
    for {
        select {
        case <-p.spawner:
            for {
                if p.process <= limit {
                    fmt.Println("breaking for new process")
                    break
                }
                time.Sleep(time.Second * 10)
            }
            p.process  

            go func() {
                fmt.Println("  Starting goroutine")
                p.spawner <- struct{}{}
                time.Sleep(time.Second * 2)
                fmt.Println("- Stopping goroutine")
                p.complete <- struct{}{}

            }()

        case <-p.complete:
            fmt.Println("complete")
            p.process--
        }
    }
}
func main() {
    interruptChannel := make(chan os.Signal, 1)
    signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    pm := NewProcessManager()
    go pm.Run(5)
    pm.spawner <- struct{}{}
    <-interruptChannel
}

CodePudding user response:

The whole purpose of the ProcessManager in my eyes is to serialize access to the metadata keeping track of running processes. As such, it runs sequential.

case <-p.spawner:
    for {
        if p.process <= limit {
            fmt.Println("breaking for new process")
            break
        }
        time.Sleep(time.Second * 10)
    }

In this section of code, the goroutine is sleeping. While sleeping, the complete case in the select statement cannot run.

Recall the following property of unbuffered channels:

By default, sends and receives block until the other side is ready.

Because the channels are unbuffered, this line of code must block until that complete case is triggered:

p.complete <- struct{}{}

CodePudding user response:

Managed to solve this by switching process to process: make(chan int, 4)

Then I just used this to block instead of the for loop p.process <- 1 and then use this to mark a routine as completed <-p.process

The length of the channel (4) will determine the max number of processed allowed to run at any given time. Updated test I provided below:

func NewProcessManager() *ProcessManager {
    return &ProcessManager{
        spawner:  make(chan struct{}),
        complete: make(chan struct{}),
        process:  make(chan int, 4),
    }
}

func (p *ProcessManager) Run() {
    for {
        select {
        case <-p.spawner:
            p.process <- 1
            go func() {
                fmt.Println("  Starting goroutine")
                // do stuff before starting next routine
                p.spawner <- struct{}{}
                // do stuff rest of stuff
                fmt.Println("- Stopping goroutine")
                <-p.process

            }()
    }
}
  •  Tags:  
  • Related