There is a code:
func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf("res - %v, err - %v\n", res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
result := make([]int, 0)
errCh := make(chan error)
wg := &sync.WaitGroup{}
wg.Add(len(args))
for _, a := range args {
go func(aLit int) {
value, err := RandomError(aLit)
if err != nil {
errCh <- err
return
}
result = append(result, value)
}(a)
}
errValue := <-errCh // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
if errValue != nil {
fmt.Println("returning because err")
return nil, errValue // if RandomError() returns error, returned here as expected
}
wg.Wait()
return result, nil
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance > 40 {
fmt.Printf("error on arg - %d\n", arg)
return 0, errors.New("mock err")
}
return errChance, nil
}
I need that if the RandomError() function returns an error, then the MockFunc() function completed before waiting for all waitgroups and returned an error. But if there are no errors, then I get a deadlock, if there is, then everything works as expected.
I understand that this is because I am not closing the channel. But if I close it after wg.Wait(), then the meaning of this will be lost, since if the first call to the function returned an error, then I will wait for the results of all the other calls.
I need it so that if one of the calls to RandomeErr() returned an error, I returned this error from MockFunc() to main() without waiting for the end of all calls.
CodePudding user response:
You have three deadlocks:
- errValue := <-errCh // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
- wg.Wait() # line 44
- errCh <- err # line 29
the second one never happend because of return statement or first deadlock. for fixing deadlock 1, we must read channel with non-blocking mode. for fixing deadlock 2, we must use wg.Done in each goroutine. for fixing deadlock 3, we must use buffered channel or somehow we must consume the channel. here for simplicity I choose buffered channel, but we can read errCh inside for loop and return error if we saw error.
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf("res - %v, err - %v\n", res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
result := make([]int, 0)
errCh := make(chan error, len(args))
wg := &sync.WaitGroup{}
wg.Add(len(args))
for _, a := range args {
go func(aLit int) {
defer wg.Done()
value, err := RandomError(aLit)
if err != nil {
errCh <- err
return
}
result = append(result, value)
}(a)
}
var errValue error
select {
case errValue = <-errCh: // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
default:
}
if errValue != nil {
fmt.Println("returning because err")
return nil, errValue // if RandomError() returns error, returned here as expected
}
wg.Wait()
return result, nil
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance > 40 {
fmt.Printf("error on arg - %d\n", arg)
return 0, errors.New("mock err")
}
return errChance, nil
}
CodePudding user response:
Might be easier to use errgroup.Group.
I also had to create another goroutine and channel to read the results into the array. If you use result = append(result, value) within many goroutines you'll get an error at some point.
One of the benefits of using errgroup.Group is that all goroutines must be cleaned up and stopped before the function will return. This helps prevent memory leaks when these kinds of functions are part of larger system.
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf("res - %v, err - %v\n", res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
resultChan := make(chan int)
result := make([]int, 0)
eg, ctx := errgroup.WithContext(context.Background())
// Read all results into an array
eg.Go(func() error {
for {
select {
case <-ctx.Done():
// Always check to see if the context has cancelled,
// if there is an error errgroup will cancel the
// context and all goroutines will need to exit
// before `eg.Wait` returns.
return context.Canceled
case val := <-resultChan:
result = append(result, val)
if len(result) == len(args) {
return nil
}
}
}
})
for _, a := range args {
aLit := a // Copy the value so that we don't re-use the memory address
eg.Go(func() error {
value, err := RandomError(aLit)
if err != nil {
return err
}
select {
case resultChan <- value:
case <-ctx.Done():
return context.Canceled
default:
}
return nil
})
}
return result, eg.Wait()
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance > 40 {
fmt.Printf("error on arg - %d\n", arg)
return 0, errors.New("mock err")
}
return errChance, nil
}
Can also be simpler if you buffer all the results into a channel:
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
resultChan := make(chan int, len(args))
result := make([]int, 0)
eg, ctx := errgroup.WithContext(context.Background())
for _, a := range args {
aLit := a // Copy the value so that we don't re-use the memory address
eg.Go(func() error {
value, err := RandomError(aLit)
if err != nil {
return err
}
select {
case resultChan <- value:
case <-ctx.Done():
return context.Canceled
default:
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
close(resultChan)
for val := range resultChan {
result = append(result, val)
}
return result, nil
}
And here is the simplest possible implementation I could think of:
func MockFunc() (result []int, err error) {
args := []int{1, 2, 3, 4, 5}
resultChan := make(chan int)
errChan := make(chan error)
for _, a := range args {
go func(aLit int) {
value, err := RandomError(aLit)
if err != nil {
errChan <- err
}
resultChan <- value
}(a)
}
for i := 0; i < len(args); i {
select {
case err := <-errChan:
return nil, err
case val := <-resultChan:
result = append(result, val)
}
}
return result, nil
}
