Skip to content

Commit 61693df

Browse files
committed
fix(loki/src/k8s_events): revert interface change + add background actor to set health based on applied task result
Signed-off-by: hainenber <[email protected]>
1 parent 6152c7b commit 61693df

File tree

6 files changed

+49
-40
lines changed

6 files changed

+49
-40
lines changed

component/loki/source/docker/runner.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func newTailer(l log.Logger, task *tailerTask) *tailer {
9494
}
9595
}
9696

97-
func (t *tailer) Run(ctx context.Context) error {
97+
func (t *tailer) Run(ctx context.Context) {
9898
ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit)
9999

100100
t.target.StartIfNotRunning()
@@ -108,10 +108,10 @@ func (t *tailer) Run(ctx context.Context) error {
108108
// refresh.
109109
level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err)
110110
t.target.Stop()
111-
return err
111+
return
112112
case <-ch:
113113
t.target.Stop()
114-
return nil
114+
return
115115
}
116116
}
117117

component/loki/source/kubernetes/kubetail/tailer.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ var retailBackoff = backoff.Config{
8989
MaxBackoff: time.Minute,
9090
}
9191

92-
func (t *tailer) Run(ctx context.Context) error {
92+
func (t *tailer) Run(ctx context.Context) {
9393
ctx, cancel := context.WithCancel(ctx)
9494
defer cancel()
9595

@@ -111,22 +111,18 @@ func (t *tailer) Run(ctx context.Context) error {
111111
terminated, err := t.containerTerminated(ctx)
112112
if terminated {
113113
// The container shut down and won't come back; we can stop tailing it.
114-
return nil
114+
return
115115
} else if err != nil {
116116
level.Warn(t.log).Log("msg", "could not determine if container terminated; will retry tailing", "err", err)
117-
return err
118117
}
119118
}
120119

121120
if err != nil {
122121
t.target.Report(time.Now().UTC(), err)
123122
level.Warn(t.log).Log("msg", "tailer stopped; will retry", "err", err)
124-
return err
125123
}
126124
bo.Wait()
127125
}
128-
129-
return nil
130126
}
131127

132128
func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {

component/loki/source/kubernetes_events/event_controller.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"strings"
9+
"sync"
910
"time"
1011

1112
"github.com/cespare/xxhash/v2"
@@ -58,6 +59,9 @@ type eventController struct {
5859

5960
positionsKey string
6061
initTimestamp time.Time
62+
63+
taskErr error
64+
taskErrMut sync.RWMutex
6165
}
6266

6367
func newEventController(task eventControllerTask) *eventController {
@@ -79,18 +83,20 @@ func newEventController(task eventControllerTask) *eventController {
7983
}
8084
}
8185

82-
func (ctrl *eventController) Run(ctx context.Context) error {
86+
func (ctrl *eventController) Run(ctx context.Context) {
8387
defer ctrl.handler.Stop()
8488

8589
level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace)
8690
defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace)
8791

88-
if err := ctrl.runError(ctx); err != nil {
92+
err := ctrl.runError(ctx)
93+
if err != nil {
8994
level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err)
90-
return err
9195
}
9296

93-
return nil
97+
ctrl.taskErrMut.Lock()
98+
ctrl.taskErr = err
99+
ctrl.taskErrMut.Unlock()
94100
}
95101

96102
func (ctrl *eventController) runError(ctx context.Context) error {
@@ -346,6 +352,12 @@ func (ctrl *eventController) DebugInfo() controllerInfo {
346352
}
347353
}
348354

355+
func (ctrl *eventController) GetTaskError() error {
356+
ctrl.taskErrMut.RLock()
357+
defer ctrl.taskErrMut.RUnlock()
358+
return ctrl.taskErr
359+
}
360+
349361
type controllerInfo struct {
350362
Namespace string `river:"namespace,attr"`
351363
LastTimestamp time.Time `river:"last_event_timestamp,attr"`

component/loki/source/kubernetes_events/kubernetes_events.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,30 @@ func (c *Component) Run(ctx context.Context) error {
161161
c.setHealth(err)
162162
level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err)
163163
}
164+
}
165+
}
166+
}, func(_ error) {
167+
cancel()
168+
})
164169

165-
// Check on bubbled up errors encountered by the workers when running applied tasks
166-
// and set component health accordingly
170+
// Actor to set component health through errors from applied tasks.
171+
ticker := time.NewTicker(500 * time.Millisecond)
172+
rg.Add(func() error {
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return nil
177+
case <-ticker.C:
167178
appliedTaskErrorString := ""
168-
for _, err := range c.runner.GetWorkerErrors() {
169-
appliedTaskErrorString += err.Error() + "\n"
179+
for _, worker := range c.runner.Workers() {
180+
if taskError := worker.(*eventController).GetTaskError(); taskError != nil {
181+
appliedTaskErrorString += taskError.Error() + "\n"
182+
}
170183
}
171184
if appliedTaskErrorString != "" {
172185
c.setHealth(fmt.Errorf(appliedTaskErrorString))
186+
} else {
187+
c.setHealth(nil)
173188
}
174189
}
175190
}

pkg/runner/runner.go

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Worker interface {
3232
// Run starts a Worker, blocking until the provided ctx is canceled or a
3333
// fatal error occurs. Run is guaranteed to be called exactly once for any
3434
// given Worker.
35-
Run(ctx context.Context) error
35+
Run(ctx context.Context)
3636
}
3737

3838
// The Runner manages a set of running Workers based on an active set of tasks.
@@ -42,10 +42,8 @@ type Runner[TaskType Task] struct {
4242
ctx context.Context
4343
cancel context.CancelFunc
4444

45-
running sync.WaitGroup
46-
workers *hashMap
47-
workerErrs []error
48-
workerErrMut sync.RWMutex
45+
running sync.WaitGroup
46+
workers *hashMap
4947
}
5048

5149
// Internal types used to implement the Runner.
@@ -65,8 +63,9 @@ type (
6563
// workerTask implements Task for it to be used in a hashMap; two workerTasks
6664
// are equal if their underlying Tasks are equal.
6765
workerTask struct {
68-
Worker *scheduledWorker
69-
Task Task
66+
Worker *scheduledWorker
67+
Task Task
68+
TaskErr error
7069
}
7170
)
7271

@@ -165,12 +164,8 @@ func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error
165164
go func() {
166165
defer s.running.Done()
167166
defer close(newWorker.Exited)
168-
// Gather error encountered by worker when running
169-
if err := newWorker.Worker.Run(workerCtx); err != nil {
170-
s.workerErrMut.Lock()
171-
s.workerErrs = append(s.workerErrs, err)
172-
s.workerErrMut.Unlock()
173-
}
167+
// Gather error encountered by worker when running the defined task.
168+
newWorker.Worker.Run(workerCtx)
174169
}()
175170

176171
_ = s.workers.Add(newTask)
@@ -210,10 +205,3 @@ func (s *Runner[TaskType]) Stop() {
210205
s.cancel()
211206
s.running.Wait()
212207
}
213-
214-
// GetWorkerErrors returns errors encountered by workers when they run assigned tasks
215-
func (s *Runner[TaskType]) GetWorkerErrors() []error {
216-
s.workerErrMut.RLock()
217-
defer s.workerErrMut.RUnlock()
218-
return s.workerErrs
219-
}

pkg/runner/runner_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,9 @@ type genericWorker struct {
101101

102102
var _ runner.Worker = (*genericWorker)(nil)
103103

104-
func (w *genericWorker) Run(ctx context.Context) error {
104+
func (w *genericWorker) Run(ctx context.Context) {
105105
w.workerCount.Inc()
106106
defer w.workerCount.Dec()
107107

108108
<-ctx.Done()
109-
110-
return nil
111109
}

0 commit comments

Comments
 (0)