Skip to content

Commit c706fbb

Browse files
committed
feat: initial event sink implementation
1 parent 68127b9 commit c706fbb

File tree

12 files changed

+498
-3
lines changed

12 files changed

+498
-3
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ require (
8888
github.com/beorn7/perks v1.0.1 // indirect
8989
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
9090
github.com/cespare/xxhash/v2 v2.2.0 // indirect
91+
github.com/cloudevents/sdk-go/v2 v2.15.0 // indirect
9192
github.com/containerd/continuity v0.4.3 // indirect
9293
github.com/coreos/go-iptables v0.7.0 // indirect
9394
github.com/danieljoos/wincred v1.1.2 // indirect
@@ -132,6 +133,7 @@ require (
132133
github.com/jmespath/go-jmespath v0.4.0 // indirect
133134
github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect
134135
github.com/jsimonetti/rtnetlink v1.4.0 // indirect
136+
github.com/json-iterator/go v1.1.12 // indirect
135137
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
136138
github.com/kylelemons/godebug v1.1.0 // indirect
137139
github.com/labstack/gommon v0.4.2 // indirect
@@ -145,6 +147,8 @@ require (
145147
github.com/miekg/dns v1.1.57 // indirect
146148
github.com/mitchellh/mapstructure v1.5.0 // indirect
147149
github.com/moby/term v0.5.0 // indirect
150+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
151+
github.com/modern-go/reflect2 v1.0.2 // indirect
148152
github.com/mtibben/percent v0.2.1 // indirect
149153
github.com/opencontainers/go-digest v1.0.0 // indirect
150154
github.com/opencontainers/image-spec v1.1.0-rc6 // indirect

go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,10 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
291291
github.com/cilium/ebpf v0.12.3 h1:8ht6F9MquybnY97at+VDZb3eQQr8ev79RueWeVaEcG4=
292292
github.com/cilium/ebpf v0.12.3/go.mod h1:TctK1ivibvI3znr66ljgi4hqOT8EYQjz1KWBfb1UVgM=
293293
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
294+
github.com/cloudevents/sdk-go/v2 v2.12.0 h1:p1k+ysVOZtNiXfijnwB3WqZNA3y2cGOiKQygWkUHCEI=
295+
github.com/cloudevents/sdk-go/v2 v2.12.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
296+
github.com/cloudevents/sdk-go/v2 v2.15.0 h1:aKnhLQhyoJXqEECQdOIZnbZ9VupqlidE6hedugDGr+I=
297+
github.com/cloudevents/sdk-go/v2 v2.15.0/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE=
294298
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
295299
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
296300
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@@ -559,6 +563,7 @@ github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4os
559563
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
560564
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
561565
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
566+
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
562567
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
563568
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
564569
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
@@ -640,9 +645,11 @@ github.com/mitchellh/pointerstructure v1.2.1/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8oh
640645
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
641646
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
642647
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
648+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
643649
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
644650
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
645651
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
652+
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
646653
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
647654
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
648655
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=

internal/config/config.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,15 @@ func defaultConfig() *Config {
125125
Logging: Logging{
126126
Level: "info",
127127
},
128+
Events: Events{
129+
Log: EventsLogSink{
130+
Enabled: false,
131+
},
132+
File: EventsFileSink{
133+
Enabled: false,
134+
FileName: "events.log",
135+
},
136+
},
128137
}
129138
}
130139

@@ -146,6 +155,7 @@ type Config struct {
146155
Auth Auth `yaml:"auth,omitempty" envPrefix:"AUTH_"`
147156
DNS DNS `yaml:"dns,omitempty"`
148157
Logging Logging `yaml:"logging,omitempty" envPrefix:"LOGGING_"`
158+
Events Events `yaml:"events,omitempty" envPrefix:"EVENTS_"`
149159
}
150160

151161
type Tls struct {
@@ -168,6 +178,30 @@ type Logging struct {
168178
File string `yaml:"file,omitempty" env:"FILE"`
169179
}
170180

181+
type Events struct {
182+
Log EventsLogSink `yaml:"log,omitempty" envPrefix:"LOG_"`
183+
File EventsFileSink `yaml:"file,omitempty" envPrefix:"FILE_"`
184+
Tcp EventsTcpSink `yaml:"tcp,omitempty" envPrefix:"TCP_"`
185+
}
186+
187+
type EventsLogSink struct {
188+
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
189+
}
190+
191+
type EventsFileSink struct {
192+
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
193+
Path string `yaml:"path,omitempty" env:"PATH"`
194+
FileName string `yaml:"name,omitempty" env:"NAME"`
195+
MaxBytes int `yaml:"max_bytes,omitempty" env:"MAX_BYTES"`
196+
MaxDuration time.Duration `yaml:"max_duration,omitempty" env:"MAX_DURATION"`
197+
MaxFiles int `yaml:"max_files,omitempty" env:"MAX_FILES"`
198+
}
199+
200+
type EventsTcpSink struct {
201+
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
202+
Addr string `yaml:"addr,omitempty" env:"ADDR"`
203+
}
204+
171205
type Database struct {
172206
Type string `yaml:"type,omitempty" env:"TYPE"`
173207
Url string `yaml:"url,omitempty" env:"URL"`

internal/eventlog/events.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package eventlog
2+
3+
import (
4+
cloudevents "github.com/cloudevents/sdk-go/v2"
5+
"github.com/jsiebens/ionscale/internal/domain"
6+
"math/big"
7+
)
8+
9+
const (
10+
tailnetCreated = "ionscale.tailnet.created"
11+
tailnetDeleted = "ionscale.tailnet.deleted"
12+
nodeCreated = "ionscale.node.created"
13+
)
14+
15+
func TailnetCreated(tailnet *domain.Tailnet, actor *domain.User) cloudevents.Event {
16+
data := &EventData{
17+
Tailnet: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
18+
Target: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
19+
Actor: system,
20+
}
21+
22+
if actor != nil {
23+
data.Actor = Actor{ID: idToStr(actor.ID), Name: actor.Name}
24+
}
25+
26+
event := cloudevents.NewEvent()
27+
event.SetType(tailnetCreated)
28+
_ = event.SetData(cloudevents.ApplicationJSON, data)
29+
30+
return event
31+
}
32+
33+
func MachineCreated(machine *domain.Machine, actor *domain.User) cloudevents.Event {
34+
data := &EventData{
35+
Tailnet: &Target{ID: idToStr(machine.Tailnet.ID), Name: machine.Tailnet.Name},
36+
Target: &Target{ID: idToStr(machine.ID), Name: machine.CompleteName(), Addresses: machine.IPs()},
37+
Actor: UserToActor(actor),
38+
}
39+
40+
event := cloudevents.NewEvent()
41+
event.SetType(nodeCreated)
42+
_ = event.SetData(cloudevents.ApplicationJSON, data)
43+
44+
return event
45+
}
46+
47+
func UserToActor(actor *domain.User) Actor {
48+
if actor == nil {
49+
return system
50+
}
51+
52+
switch actor.UserType {
53+
case domain.UserTypePerson:
54+
return Actor{ID: idToStr(actor.ID), Name: actor.Name}
55+
default:
56+
return system
57+
}
58+
}
59+
60+
type EventData struct {
61+
Tailnet *Target `json:"tailnet,omitempty"`
62+
Target *Target `json:"target,omitempty"`
63+
Actor Actor `json:"actor"`
64+
}
65+
66+
type Target struct {
67+
ID string `json:"id"`
68+
Name string `json:"name"`
69+
Addresses []string `json:"addresses,omitempty"`
70+
}
71+
72+
type Actor struct {
73+
ID string `json:"id,omitempty"`
74+
Name string `json:"name"`
75+
}
76+
77+
func idToStr(id uint64) string {
78+
return big.NewInt(int64(id)).Text(10)
79+
}
80+
81+
var system = Actor{ID: "", Name: "ionscale system"}

internal/eventlog/global.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package eventlog
2+
3+
import (
4+
"bytes"
5+
"context"
6+
cloudevents "github.com/cloudevents/sdk-go/v2"
7+
"github.com/hashicorp/go-multierror"
8+
"github.com/jsiebens/ionscale/internal/config"
9+
"github.com/jsiebens/ionscale/internal/util"
10+
"go.uber.org/zap"
11+
"io"
12+
"os"
13+
"path/filepath"
14+
"sync"
15+
"time"
16+
)
17+
18+
const (
19+
stdout = "/dev/stdout"
20+
stderr = "/dev/stderr"
21+
devnull = "/dev/null"
22+
)
23+
24+
type Events []cloudevents.Event
25+
26+
func (e *Events) Add(event cloudevents.Event) {
27+
x := append(*e, event)
28+
*e = x
29+
}
30+
31+
type Eventer interface {
32+
Send(ctx context.Context, events ...cloudevents.Event) error
33+
}
34+
35+
type eventer struct {
36+
source string
37+
sinks []sink
38+
}
39+
40+
func (e *eventer) Send(ctx context.Context, events ...cloudevents.Event) error {
41+
groupID := util.NextIDString()
42+
now := time.Now()
43+
44+
for _, event := range events {
45+
event.SetSource(e.source)
46+
event.SetID(util.NextIDString())
47+
event.SetTime(now)
48+
event.SetExtension("eventGroupID", groupID)
49+
}
50+
51+
var r *multierror.Error
52+
for _, s := range e.sinks {
53+
r = multierror.Append(r, s.process(ctx, events...))
54+
}
55+
56+
return r.ErrorOrNil()
57+
}
58+
59+
type sink interface {
60+
process(context.Context, ...cloudevents.Event) error
61+
}
62+
63+
var (
64+
_globalMu sync.RWMutex
65+
_globalE Eventer = &eventer{}
66+
)
67+
68+
func Configure(c *config.Config) error {
69+
var sinks []sink
70+
71+
if c.Events.Log.Enabled {
72+
sinks = append(sinks, &zapSink{logger: zap.L().Named("events").WithOptions(zap.AddCallerSkip(3))})
73+
}
74+
75+
if c.Events.File.Enabled {
76+
switch c.Events.File.Path {
77+
case devnull:
78+
// ignore
79+
case stderr:
80+
sinks = append(sinks, &writerSink{w: os.Stderr})
81+
case stdout:
82+
sinks = append(sinks, &writerSink{w: os.Stdout})
83+
default:
84+
abs, err := filepath.Abs(c.Events.File.Path)
85+
if err != nil {
86+
return err
87+
}
88+
89+
sinks = append(sinks, &fileSink{
90+
path: abs,
91+
fileName: c.Events.File.FileName,
92+
maxBytes: c.Events.File.MaxBytes,
93+
maxDuration: c.Events.File.MaxDuration,
94+
maxFiles: c.Events.File.MaxFiles,
95+
})
96+
}
97+
}
98+
99+
_globalMu.Lock()
100+
defer _globalMu.Unlock()
101+
_globalE = &eventer{
102+
source: c.ServerUrl,
103+
sinks: sinks,
104+
}
105+
106+
return nil
107+
}
108+
109+
func Send(ctx context.Context, events ...cloudevents.Event) {
110+
_globalMu.RLock()
111+
l := _globalE
112+
_globalMu.RUnlock()
113+
114+
if err := l.Send(ctx, events...); err != nil {
115+
zap.L().Error("error while processing event", zap.Error(err))
116+
}
117+
}
118+
119+
func writeJSONLine(w io.Writer, events ...cloudevents.Event) (int, error) {
120+
var payload bytes.Buffer
121+
122+
for _, event := range events {
123+
eventJson, err := event.MarshalJSON()
124+
if err != nil {
125+
return 0, err
126+
}
127+
128+
payload.Write(eventJson)
129+
payload.Write([]byte("\n"))
130+
}
131+
132+
return w.Write(payload.Bytes())
133+
}

0 commit comments

Comments
 (0)