Home > Mobile >  Execute multiple independent jobs continuously
Execute multiple independent jobs continuously

Time:02-06

I have a set of jobs which are independent of each other. Hence each of these jobs can be run concurrently using goroutines. Note that once a single job completes, it should wait for few seconds and start again (applies to all the jobs) and this goes on in a loop until the Go API service stops. Also note that all these jobs execute the same goroutine (makes a REST call). What would be the best pattern to implement this in Go. Please note that I would also want to wait for currently executing jobs to complete before my service shuts down.

CodePudding user response:

If I understand correctly, you are looking for something like this.

This code will run the workers in a loop, the workers run parallel as a group until you exit the program sending an end signal, but wait for the current loop to finihish before exiting.

func main() {
    srv := server{
        workers: 5,
    }
    srv.Run()
}

// inspired by: https://goinbigdata.com/golang-wait-for-all-goroutines-to-finish/#:~:text=A WaitGroup allows to wait,until all goroutines have finished.
func work(wg *sync.WaitGroup, i int) {
    defer wg.Done()

    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(10)

    fmt.Printf("Worker %v: Started\n", i)
    time.Sleep(time.Duration(n) * time.Second)
    fmt.Printf("Worker %v: Finished\n", i)

}

type server struct {
    running bool
    workers int
}

func (srv *server) Run() {

    done := make(chan bool, 1) // this channel

    signalCh := make(chan os.Signal, 1) // this channel will get a signal on system call
    signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-signalCh
        srv.running = false
        done <- true
    }()

    srv.running = true
    for srv.running {
        var wg sync.WaitGroup
        for i := 0; i < srv.workers; i   {
            wg.Add(1)
            go work(&wg, i)
        }
        wg.Wait()
    }
    <-done
}

CodePudding user response:

If I got you right, you are looking for something likes this This is a service with a consumer pool to execute jobs concurrently. When a job is done, it will repeat again after a interval until you stop the service.

type job struct {
    id     int
    result chan error
}

func newJob(id int) job {
    return job{
        id:     id,
        result: make(chan error, 1),
    }
}

type service struct {
    pending chan job

    consumerLimit  int
    repeatInterval time.Duration

    isClosed chan struct{}
    shutdown chan chan error
}

func newService(repeatInterval time.Duration, consumerLimit int, pendingChannelSize int) *service {
    s := &service{
        pending:        make(chan job, pendingChannelSize),
        consumerLimit:  consumerLimit,
        repeatInterval: repeatInterval,
        isClosed:       make(chan struct{}, consumerLimit),
        shutdown:       make(chan chan error),
    }

    for i := 0; i < s.consumerLimit; i   {
        go s.consumer()
    }

    return s
}

func (s *service) do(ctx context.Context, job job) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case s.pending <- job:
        return <-job.result
    case <-s.isClosed:
        return errors.New("service has been shut down")
    }
}

func (s *service) consumer() {
    for {
        select {
        case j := <-s.pending:
            //Simulate working process
            time.Sleep(time.Duration(rand.Intn(200))   200)
            j.result <- nil
            fmt.Println(fmt.Sprintf("job %v is done", j.id))

            go func() {
                //Repeat after a time
                time.Sleep(s.repeatInterval)
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
                defer cancel()
                if err := s.do(ctx, newJob(j.id)); err != nil {
                    fmt.Println(fmt.Errorf("failed to send job to repeat: %v", err))
                }
            }()
        case result := <-s.shutdown:
            result <- nil
            return
        }
    }
}

func (s *service) close() error {
    result := make(chan error, 1)
    for i := 0; i < s.consumerLimit; i   {
        s.shutdown <- result
    }
    close(s.isClosed)
    return <-result
}

func main() {
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    service := newService(time.Second, 5, 1000)

    //Assign jobs
    for i := 1; i < 10; i   {
        go func(i int) {
            if err := service.do(context.Background(), newJob(i)); err != nil {
                fmt.Println(fmt.Errorf("failed to send job: %v", err))
            }
        }(i)
    }

    select {
    case <-interrupt:
        switch err := service.close(); err {
        case nil:
            fmt.Println("service has been shutdown successfully")
        default:
            fmt.Println(fmt.Errorf("failed to graceful shut down service: %w", err))
        }
        return
    }
}
  •  Tags:  
  • Related