|
1 |
| -//go:build wasi |
2 |
| -// +build wasi |
3 |
| - |
4 | 1 | /*
|
5 | 2 | * Copyright 2024 Function Stream Org.
|
6 | 3 | *
|
|
19 | 16 |
|
20 | 17 | package gofs
|
21 | 18 |
|
22 |
| -import "C" |
23 | 19 | import (
|
24 | 20 | "encoding/json"
|
25 | 21 | "fmt"
|
26 |
| - "github.com/wirelessr/avroschema" |
27 | 22 | "os"
|
28 |
| - "syscall" |
| 23 | + "sync" |
| 24 | + "time" |
| 25 | + |
| 26 | + "github.com/wirelessr/avroschema" |
| 27 | +) |
| 28 | + |
| 29 | +const ( |
| 30 | + StateInit int32 = iota |
| 31 | + StateRunning |
| 32 | +) |
| 33 | + |
| 34 | +const ( |
| 35 | + FSSocketPath = "FS_SOCKET_PATH" |
| 36 | + FSFunctionName = "FS_FUNCTION_NAME" |
| 37 | + FSModuleName = "FS_MODULE_NAME" |
| 38 | + DefaultModule = "default" |
29 | 39 | )
|
30 | 40 |
|
31 |
| -var processFd int |
32 |
| -var registerSchemaFd int |
| 41 | +var ( |
| 42 | + ErrRegisterModuleDuringRunning = fmt.Errorf("cannot register module during running") |
| 43 | + ErrAlreadyRunning = fmt.Errorf("already running") |
| 44 | +) |
33 | 45 |
|
34 |
| -func init() { |
35 |
| - processFd, _ = syscall.Open("/process", syscall.O_RDWR, 0) |
36 |
| - registerSchemaFd, _ = syscall.Open("/registerSchema", syscall.O_RDWR, 0) |
| 46 | +type FSClient interface { |
| 47 | + error |
| 48 | + Register(module string, wrapper *moduleWrapper) FSClient |
| 49 | + Run() error |
37 | 50 | }
|
38 | 51 |
|
39 |
| -var processFunc func([]byte) []byte |
| 52 | +type fsClient struct { |
| 53 | + rpc *fsRPCClient |
| 54 | + modules map[string]*moduleWrapper |
| 55 | + state int32 |
| 56 | + registerMu sync.Mutex |
| 57 | + err error |
| 58 | +} |
40 | 59 |
|
41 |
| -func Register[I any, O any](process func(*I) *O) error { |
42 |
| - outputSchema, err := avroschema.Reflect(new(O)) |
43 |
| - if err != nil { |
44 |
| - return err |
| 60 | +func NewFSClient() FSClient { |
| 61 | + return &fsClient{ |
| 62 | + modules: make(map[string]*moduleWrapper), |
| 63 | + state: StateInit, |
45 | 64 | }
|
46 |
| - processFunc = func(payload []byte) []byte { |
| 65 | +} |
| 66 | + |
| 67 | +type moduleWrapper struct { |
| 68 | + *fsClient |
| 69 | + processFunc func([]byte) []byte // Only for Function |
| 70 | + executeFunc func() error |
| 71 | + initFunc func() error |
| 72 | + registerErr error |
| 73 | +} |
| 74 | + |
| 75 | +func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient { |
| 76 | + if c.err != nil { |
| 77 | + return c |
| 78 | + } |
| 79 | + c.registerMu.Lock() |
| 80 | + defer c.registerMu.Unlock() |
| 81 | + if c.state == StateRunning { |
| 82 | + c.err = ErrRegisterModuleDuringRunning |
| 83 | + return c |
| 84 | + } |
| 85 | + if wrapper.registerErr != nil { |
| 86 | + c.err = wrapper.registerErr |
| 87 | + return c |
| 88 | + } |
| 89 | + c.modules[module] = wrapper |
| 90 | + return c |
| 91 | +} |
| 92 | + |
| 93 | +func Function[I any, O any](process func(*I) *O) *moduleWrapper { |
| 94 | + processFunc := func(payload []byte) []byte { |
47 | 95 | input := new(I)
|
48 |
| - err = json.Unmarshal(payload, input) |
| 96 | + err := json.Unmarshal(payload, input) |
49 | 97 | if err != nil {
|
50 |
| - fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload) |
51 |
| - return nil |
| 98 | + _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) |
52 | 99 | }
|
53 | 100 | output := process(input)
|
54 | 101 | outputPayload, _ := json.Marshal(output)
|
55 | 102 | return outputPayload
|
56 | 103 | }
|
57 |
| - syscall.Write(registerSchemaFd, []byte(outputSchema)) |
58 |
| - return nil |
| 104 | + m := &moduleWrapper{} |
| 105 | + m.initFunc = func() error { |
| 106 | + outputSchema, err := avroschema.Reflect(new(O)) |
| 107 | + if err != nil { |
| 108 | + return err |
| 109 | + } |
| 110 | + err = m.rpc.RegisterSchema(outputSchema) |
| 111 | + if err != nil { |
| 112 | + return fmt.Errorf("failed to register schema: %w", err) |
| 113 | + } |
| 114 | + return nil |
| 115 | + } |
| 116 | + m.executeFunc = func() error { |
| 117 | + for { |
| 118 | + inputPayload, err := m.rpc.Read() |
| 119 | + if err != nil { |
| 120 | + _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) |
| 121 | + time.Sleep(3 * time.Second) |
| 122 | + continue |
| 123 | + } |
| 124 | + outputPayload := processFunc(inputPayload) |
| 125 | + err = m.rpc.Write(outputPayload) |
| 126 | + if err != nil { |
| 127 | + _, _ = fmt.Fprintf(os.Stderr, "Failed to write: %s\n", err) |
| 128 | + } |
| 129 | + } |
| 130 | + } |
| 131 | + m.processFunc = processFunc |
| 132 | + return m |
| 133 | +} |
| 134 | + |
| 135 | +func Source[O any](process func(emit func(*O) error)) *moduleWrapper { |
| 136 | + m := &moduleWrapper{} |
| 137 | + emit := func(event *O) error { |
| 138 | + outputPayload, _ := json.Marshal(event) |
| 139 | + return m.rpc.Write(outputPayload) |
| 140 | + } |
| 141 | + m.initFunc = func() error { |
| 142 | + outputSchema, err := avroschema.Reflect(new(O)) |
| 143 | + if err != nil { |
| 144 | + return err |
| 145 | + } |
| 146 | + err = m.rpc.RegisterSchema(outputSchema) |
| 147 | + if err != nil { |
| 148 | + return fmt.Errorf("failed to register schema: %w", err) |
| 149 | + } |
| 150 | + return nil |
| 151 | + } |
| 152 | + m.executeFunc = func() error { |
| 153 | + process(emit) |
| 154 | + return nil |
| 155 | + } |
| 156 | + return m |
59 | 157 | }
|
60 | 158 |
|
61 |
| -//export process |
62 |
| -func process() { |
63 |
| - var stat syscall.Stat_t |
64 |
| - syscall.Fstat(processFd, &stat) |
65 |
| - payload := make([]byte, stat.Size) |
66 |
| - _, _ = syscall.Read(processFd, payload) |
67 |
| - outputPayload := processFunc(payload) |
68 |
| - _, _ = syscall.Write(processFd, outputPayload) |
| 159 | +func Sink[I any](process func(*I)) *moduleWrapper { |
| 160 | + processFunc := func(payload []byte) { |
| 161 | + input := new(I) |
| 162 | + err := json.Unmarshal(payload, input) |
| 163 | + if err != nil { |
| 164 | + _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) |
| 165 | + } |
| 166 | + process(input) |
| 167 | + } |
| 168 | + m := &moduleWrapper{} |
| 169 | + m.initFunc = func() error { |
| 170 | + inputSchema, err := avroschema.Reflect(new(I)) |
| 171 | + if err != nil { |
| 172 | + return err |
| 173 | + } |
| 174 | + err = m.rpc.RegisterSchema(inputSchema) |
| 175 | + if err != nil { |
| 176 | + return fmt.Errorf("failed to register schema: %w", err) |
| 177 | + } |
| 178 | + return nil |
| 179 | + } |
| 180 | + m.executeFunc = func() error { |
| 181 | + for { |
| 182 | + inputPayload, err := m.rpc.Read() |
| 183 | + if err != nil { |
| 184 | + _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) |
| 185 | + time.Sleep(3 * time.Second) |
| 186 | + continue |
| 187 | + } |
| 188 | + processFunc(inputPayload) |
| 189 | + } |
| 190 | + } |
| 191 | + return m |
| 192 | +} |
| 193 | + |
| 194 | +func (c *fsClient) Run() error { |
| 195 | + if c.err != nil { |
| 196 | + return c.err |
| 197 | + } |
| 198 | + c.registerMu.Lock() |
| 199 | + if c.state == StateRunning { |
| 200 | + c.registerMu.Unlock() |
| 201 | + return ErrAlreadyRunning |
| 202 | + } |
| 203 | + c.state = StateRunning |
| 204 | + c.registerMu.Unlock() |
| 205 | + |
| 206 | + if c.rpc == nil { |
| 207 | + rpc, err := newFSRPCClient() |
| 208 | + if err != nil { |
| 209 | + return err |
| 210 | + } |
| 211 | + c.rpc = rpc |
| 212 | + } |
| 213 | + module := os.Getenv(FSModuleName) |
| 214 | + if module == "" { |
| 215 | + module = DefaultModule |
| 216 | + } |
| 217 | + m, ok := c.modules[module] |
| 218 | + if !ok { |
| 219 | + return fmt.Errorf("module %s not found", module) |
| 220 | + } |
| 221 | + m.fsClient = c |
| 222 | + err := m.initFunc() |
| 223 | + if err != nil { |
| 224 | + return err |
| 225 | + } |
| 226 | + c.rpc.loadModule(m) |
| 227 | + if c.rpc.skipExecuting() { |
| 228 | + return nil |
| 229 | + } |
| 230 | + return m.executeFunc() |
69 | 231 | }
|
70 | 232 |
|
71 |
| -func Run() { |
72 |
| - // Leave it empty |
| 233 | +func (c *fsClient) Error() string { |
| 234 | + return c.err.Error() |
73 | 235 | }
|
0 commit comments