Skip to content

feat: initial event sink implementation #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.15.0 // indirect
github.com/containerd/continuity v0.4.3 // indirect
github.com/coreos/go-iptables v0.7.0 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
Expand Down Expand Up @@ -132,6 +133,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect
github.com/jsimonetti/rtnetlink v1.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
Expand All @@ -145,6 +147,8 @@ require (
github.com/miekg/dns v1.1.57 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc6 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/cilium/ebpf v0.12.3 h1:8ht6F9MquybnY97at+VDZb3eQQr8ev79RueWeVaEcG4=
github.com/cilium/ebpf v0.12.3/go.mod h1:TctK1ivibvI3znr66ljgi4hqOT8EYQjz1KWBfb1UVgM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/v2 v2.12.0 h1:p1k+ysVOZtNiXfijnwB3WqZNA3y2cGOiKQygWkUHCEI=
github.com/cloudevents/sdk-go/v2 v2.12.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/cloudevents/sdk-go/v2 v2.15.0 h1:aKnhLQhyoJXqEECQdOIZnbZ9VupqlidE6hedugDGr+I=
github.com/cloudevents/sdk-go/v2 v2.15.0/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down Expand Up @@ -559,6 +563,7 @@ github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4os
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
Expand Down Expand Up @@ -640,9 +645,11 @@ github.com/mitchellh/pointerstructure v1.2.1/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8oh
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
Expand Down
34 changes: 34 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ func defaultConfig() *Config {
Logging: Logging{
Level: "info",
},
Events: Events{
Log: EventsLogSink{
Enabled: false,
},
File: EventsFileSink{
Enabled: false,
FileName: "events.log",
},
},
}
}

Expand All @@ -143,6 +152,7 @@ type Config struct {
Auth Auth `yaml:"auth,omitempty" envPrefix:"AUTH_"`
DNS DNS `yaml:"dns,omitempty"`
Logging Logging `yaml:"logging,omitempty" envPrefix:"LOGGING_"`
Events Events `yaml:"events,omitempty" envPrefix:"EVENTS_"`

WebPublicUrl *url.URL `yaml:"-"`
}
Expand All @@ -167,6 +177,30 @@ type Logging struct {
File string `yaml:"file,omitempty" env:"FILE"`
}

type Events struct {
Log EventsLogSink `yaml:"log,omitempty" envPrefix:"LOG_"`
File EventsFileSink `yaml:"file,omitempty" envPrefix:"FILE_"`
Tcp EventsTcpSink `yaml:"tcp,omitempty" envPrefix:"TCP_"`
}

type EventsLogSink struct {
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
}

type EventsFileSink struct {
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
Path string `yaml:"path,omitempty" env:"PATH"`
FileName string `yaml:"name,omitempty" env:"NAME"`
MaxBytes int `yaml:"max_bytes,omitempty" env:"MAX_BYTES"`
MaxDuration time.Duration `yaml:"max_duration,omitempty" env:"MAX_DURATION"`
MaxFiles int `yaml:"max_files,omitempty" env:"MAX_FILES"`
}

type EventsTcpSink struct {
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
Addr string `yaml:"addr,omitempty" env:"ADDR"`
}

type Database struct {
Type string `yaml:"type,omitempty" env:"TYPE"`
Url string `yaml:"url,omitempty" env:"URL"`
Expand Down
146 changes: 146 additions & 0 deletions internal/eventlog/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package eventlog

import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/jsiebens/ionscale/internal/domain"
"math/big"
)

const (
tailnetCreated = "ionscale.tailnet.create"
tailnetIamUpdated = "ionscale.tailnet.iam.update"
tailnetAclUpdated = "ionscale.tailnet.acl.update"
tailnetDNSConfigUpdated = "ionscale.tailnet.dns_config.update"
nodeCreated = "ionscale.node.create"
)

func TailnetCreated(tailnet *domain.Tailnet, actor ActorOpts) cloudevents.Event {
data := &EventData[any]{
Tailnet: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Target: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Actor: actor(),
}

event := cloudevents.NewEvent()
event.SetType(tailnetCreated)
_ = event.SetData(cloudevents.ApplicationJSON, data)

return event
}

func TailnetIAMUpdated(tailnet *domain.Tailnet, old *domain.IAMPolicy, actor ActorOpts) cloudevents.Event {
data := &EventData[*domain.IAMPolicy]{
Tailnet: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Target: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Actor: actor(),
Attr: &Attr[*domain.IAMPolicy]{
New: &tailnet.IAMPolicy,
Old: old,
},
}

event := cloudevents.NewEvent()
event.SetType(tailnetIamUpdated)
_ = event.SetData(cloudevents.ApplicationJSON, data)

return event
}

func TailnetACLUpdated(tailnet *domain.Tailnet, old *domain.ACLPolicy, actor ActorOpts) cloudevents.Event {
data := &EventData[*domain.ACLPolicy]{
Tailnet: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Target: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Actor: actor(),
Attr: &Attr[*domain.ACLPolicy]{
New: &tailnet.ACLPolicy,
Old: old,
},
}

event := cloudevents.NewEvent()
event.SetType(tailnetAclUpdated)
_ = event.SetData(cloudevents.ApplicationJSON, data)

return event
}

func TailnetDNSConfigUpdated(tailnet *domain.Tailnet, old *domain.DNSConfig, actor ActorOpts) cloudevents.Event {
data := &EventData[*domain.DNSConfig]{
Tailnet: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Target: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Actor: actor(),
Attr: &Attr[*domain.DNSConfig]{
New: &tailnet.DNSConfig,
Old: old,
},
}

event := cloudevents.NewEvent()
event.SetType(tailnetDNSConfigUpdated)
_ = event.SetData(cloudevents.ApplicationJSON, data)

return event
}

func MachineCreated(machine *domain.Machine, actor ActorOpts) cloudevents.Event {
data := &EventData[any]{
Tailnet: &Target{ID: idToStr(machine.Tailnet.ID), Name: machine.Tailnet.Name},
Target: &Target{ID: idToStr(machine.ID), Name: machine.CompleteName()},
Actor: actor(),
}

event := cloudevents.NewEvent()
event.SetType(nodeCreated)
_ = event.SetData(cloudevents.ApplicationJSON, data)

return event
}

type ActorOpts func() Actor

func User(u *domain.User) ActorOpts {
if u == nil {
return SystemAdmin()
}

switch u.UserType {
case domain.UserTypePerson:
return func() Actor {
return Actor{ID: idToStr(u.ID), Name: u.Name}
}
default:
return SystemAdmin()
}
}

func SystemAdmin() ActorOpts {
return func() Actor {
return Actor{ID: "", Name: "system admin"}
}
}

type EventData[T any] struct {
Tailnet *Target `json:"tailnet,omitempty"`
Target *Target `json:"target,omitempty"`
Attr *Attr[T] `json:"attr,omitempty"`
Actor Actor `json:"actor"`
}

type Target struct {
ID string `json:"id"`
Name string `json:"name"`
}

type Actor struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
}

type Attr[T any] struct {
New T `json:"new"`
Old T `json:"old,omitempty"`
}

func idToStr(id uint64) string {
return big.NewInt(int64(id)).Text(10)
}
133 changes: 133 additions & 0 deletions internal/eventlog/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package eventlog

import (
"bytes"
"context"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/hashicorp/go-multierror"
"github.com/jsiebens/ionscale/internal/config"
"github.com/jsiebens/ionscale/internal/util"
"go.uber.org/zap"
"io"
"os"
"path/filepath"
"sync"
"time"
)

const (
stdout = "/dev/stdout"
stderr = "/dev/stderr"
devnull = "/dev/null"
)

type Events []cloudevents.Event

func (e *Events) Add(event cloudevents.Event) {
x := append(*e, event)
*e = x
}

type Eventer interface {
Send(ctx context.Context, events ...cloudevents.Event) error
}

type eventer struct {
source string
sinks []sink
}

func (e *eventer) Send(ctx context.Context, events ...cloudevents.Event) error {
groupID := util.NextIDString()
now := time.Now()

for _, event := range events {
event.SetSource(e.source)
event.SetID(util.NextIDString())
event.SetTime(now)
event.SetExtension("eventGroupID", groupID)
}

var r *multierror.Error
for _, s := range e.sinks {
r = multierror.Append(r, s.process(ctx, events...))
}

return r.ErrorOrNil()
}

type sink interface {
process(context.Context, ...cloudevents.Event) error
}

var (
_globalMu sync.RWMutex
_globalE Eventer = &eventer{}
)

func Configure(c *config.Config) error {
var sinks []sink

if c.Events.Log.Enabled {
sinks = append(sinks, &zapSink{logger: zap.L().Named("events").WithOptions(zap.AddCallerSkip(3))})
}

if c.Events.File.Enabled {
switch c.Events.File.Path {
case devnull:
// ignore
case stderr:
sinks = append(sinks, &writerSink{w: os.Stderr})
case stdout:
sinks = append(sinks, &writerSink{w: os.Stdout})
default:
abs, err := filepath.Abs(c.Events.File.Path)
if err != nil {
return err
}

sinks = append(sinks, &fileSink{
path: abs,
fileName: c.Events.File.FileName,
maxBytes: c.Events.File.MaxBytes,
maxDuration: c.Events.File.MaxDuration,
maxFiles: c.Events.File.MaxFiles,
})
}
}

_globalMu.Lock()
defer _globalMu.Unlock()
_globalE = &eventer{
source: c.WebPublicUrl.String(),
sinks: sinks,
}

return nil
}

func Send(ctx context.Context, events ...cloudevents.Event) {
_globalMu.RLock()
l := _globalE
_globalMu.RUnlock()

if err := l.Send(ctx, events...); err != nil {
zap.L().Error("error while processing event", zap.Error(err))
}
}

func writeJSONLine(w io.Writer, events ...cloudevents.Event) (int, error) {
var payload bytes.Buffer

for _, event := range events {
eventJson, err := event.MarshalJSON()
if err != nil {
return 0, err
}

payload.Write(eventJson)
payload.Write([]byte("\n"))
}

return w.Write(payload.Bytes())
}
Loading