Skip to content

Commit 29114b8

Browse files
committed
feat(pkg/runner): collect worker errors + a method to get them
Signed-off-by: hainenber <[email protected]>
1 parent 0bc51a6 commit 29114b8

File tree

1 file changed

+17
-3
lines changed

1 file changed

+17
-3
lines changed

pkg/runner/runner.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ type Runner[TaskType Task] struct {
4242
ctx context.Context
4343
cancel context.CancelFunc
4444

45-
running sync.WaitGroup
46-
workers *hashMap
45+
running sync.WaitGroup
46+
workers *hashMap
47+
workerErrs []error
48+
workerErrMut sync.RWMutex
4749
}
4850

4951
// Internal types used to implement the Runner.
@@ -163,7 +165,12 @@ func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error
163165
go func() {
164166
defer s.running.Done()
165167
defer close(newWorker.Exited)
166-
newWorker.Worker.Run(workerCtx)
168+
// Gather error encountered by worker when running
169+
if err := newWorker.Worker.Run(workerCtx); err != nil {
170+
s.workerErrMut.Lock()
171+
s.workerErrs = append(s.workerErrs, err)
172+
s.workerErrMut.Unlock()
173+
}
167174
}()
168175

169176
_ = s.workers.Add(newTask)
@@ -203,3 +210,10 @@ func (s *Runner[TaskType]) Stop() {
203210
s.cancel()
204211
s.running.Wait()
205212
}
213+
214+
// GetWorkerErrors returns errors encountered by workers when they run assigned tasks
215+
func (s *Runner[TaskType]) GetWorkerErrors() []error {
216+
s.workerErrMut.RLock()
217+
defer s.workerErrMut.RUnlock()
218+
return s.workerErrs
219+
}

0 commit comments

Comments
 (0)