I'm trying to get a better understanding of channel in GO.
Want 5 routines to be running at all times. At specific times during the first routine, I want to try to start another routine. Assuming 5 routines are already running, I want to queue up the next routine and run it as soon as one of the other routines has been completed.
My logic was call pass a message to spawner, check to see if there are 5 processes already running, if so, keep waiting until there isn't, and start up. From what I can tell, is is p.complete <- struct{}{} isn't working as expecting and removing a process. It works fine outside of the go routine.
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
type ProcessManager struct {
spawner chan struct{}
complete chan struct{}
process int
}
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner: make(chan struct{}),
complete: make(chan struct{}),
process: 0,
}
}
func (p *ProcessManager) Run(limit int) {
for {
select {
case <-p.spawner:
for {
if p.process <= limit {
fmt.Println("breaking for new process")
break
}
time.Sleep(time.Second * 10)
}
p.process
go func() {
fmt.Println(" Starting goroutine")
p.spawner <- struct{}{}
time.Sleep(time.Second * 2)
fmt.Println("- Stopping goroutine")
p.complete <- struct{}{}
}()
case <-p.complete:
fmt.Println("complete")
p.process--
}
}
}
func main() {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
pm := NewProcessManager()
go pm.Run(5)
pm.spawner <- struct{}{}
<-interruptChannel
}
CodePudding user response:
The whole purpose of the ProcessManager in my eyes is to serialize access to the metadata keeping track of running processes. As such, it runs sequential.
case <-p.spawner:
for {
if p.process <= limit {
fmt.Println("breaking for new process")
break
}
time.Sleep(time.Second * 10)
}
In this section of code, the goroutine is sleeping. While sleeping, the complete case in the select statement cannot run.
Recall the following property of unbuffered channels:
By default, sends and receives block until the other side is ready.
Because the channels are unbuffered, this line of code must block until that complete case is triggered:
p.complete <- struct{}{}
CodePudding user response:
Managed to solve this by switching process to process: make(chan int, 4)
Then I just used this to block instead of the for loop p.process <- 1
and then use this to mark a routine as completed <-p.process
The length of the channel (4) will determine the max number of processed allowed to run at any given time. Updated test I provided below:
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner: make(chan struct{}),
complete: make(chan struct{}),
process: make(chan int, 4),
}
}
func (p *ProcessManager) Run() {
for {
select {
case <-p.spawner:
p.process <- 1
go func() {
fmt.Println(" Starting goroutine")
// do stuff before starting next routine
p.spawner <- struct{}{}
// do stuff rest of stuff
fmt.Println("- Stopping goroutine")
<-p.process
}()
}
}
