Skip to content

Commit fdbe1b5

Browse files
authored
New modules import http (#6414)
* add the import source "import.http" import.http uses the component remote.http to fetch a module from a http server * add tests
1 parent 7267e0f commit fdbe1b5

File tree

9 files changed

+149
-8
lines changed

9 files changed

+149
-8
lines changed

pkg/flow/import_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io/fs"
66
"os"
77
"path"
8+
"path/filepath"
89
"strings"
910
"sync"
1011
"testing"
@@ -109,6 +110,17 @@ func TestImportString(t *testing.T) {
109110
}
110111
}
111112

113+
func TestImportHTTP(t *testing.T) {
114+
directory := "./testdata/import_http"
115+
for _, file := range getTestFiles(directory, t) {
116+
archive, err := txtar.ParseFile(filepath.Join(directory, file.Name()))
117+
require.NoError(t, err)
118+
t.Run(archive.Files[0].Name, func(t *testing.T) {
119+
testConfig(t, string(archive.Files[0].Data), "", nil)
120+
})
121+
}
122+
}
123+
112124
type testImportError struct {
113125
description string
114126
main string

pkg/flow/internal/controller/node_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
2727
return NewLoggingConfigNode(block, globals), nil
2828
case tracingBlockID:
2929
return NewTracingConfigNode(block, globals), nil
30-
case importsource.BlockImportFile, importsource.BlockImportString:
30+
case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP:
3131
return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil
3232
default:
3333
var diags diag.Diagnostics

pkg/flow/internal/controller/node_config_import.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func (cn *ImportConfigNode) processImportedContent(content *ast.File) error {
248248
switch componentName {
249249
case declareType:
250250
cn.processDeclareBlock(blockStmt)
251-
case importsource.BlockImportFile, importsource.BlockImportString: // TODO: add other import sources
251+
case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP:
252252
err := cn.processImportBlock(blockStmt, componentName)
253253
if err != nil {
254254
return err

pkg/flow/internal/importsource/import_file.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func NewImportFile(managedOpts component.Options, eval *vm.Evaluator, onContentC
3232
}
3333
}
3434

35-
// Arguments holds values which are used to configure the local.file component.
36-
type Arguments struct {
35+
// FileArguments holds values which are used to configure the local.file component.
36+
type FileArguments struct {
3737
// Filename indicates the file to watch.
3838
Filename string `river:"filename,attr"`
3939
// Type indicates how to detect changes to the file.
@@ -42,18 +42,18 @@ type Arguments struct {
4242
PollFrequency time.Duration `river:"poll_frequency,attr,optional"`
4343
}
4444

45-
var DefaultArguments = Arguments{
45+
var DefaultFileArguments = FileArguments{
4646
Type: file.DetectorFSNotify,
4747
PollFrequency: time.Minute,
4848
}
4949

5050
type importFileConfigBlock struct {
51-
LocalFileArguments Arguments `river:",squash"`
51+
LocalFileArguments FileArguments `river:",squash"`
5252
}
5353

5454
// SetToDefault implements river.Defaulter.
5555
func (a *importFileConfigBlock) SetToDefault() {
56-
a.LocalFileArguments = DefaultArguments
56+
a.LocalFileArguments = DefaultFileArguments
5757
}
5858

5959
func (im *ImportFile) Evaluate(scope *vm.Scope) error {
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package importsource
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"reflect"
8+
"time"
9+
10+
"github.com/grafana/agent/component"
11+
common_config "github.com/grafana/agent/component/common/config"
12+
remote_http "github.com/grafana/agent/component/remote/http"
13+
"github.com/grafana/river/vm"
14+
)
15+
16+
// ImportHTTP imports a module from a HTTP server via the remote.http component.
17+
type ImportHTTP struct {
18+
managedRemoteHTTP *remote_http.Component
19+
arguments component.Arguments
20+
managedOpts component.Options
21+
eval *vm.Evaluator
22+
}
23+
24+
var _ ImportSource = (*ImportHTTP)(nil)
25+
26+
func NewImportHTTP(managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) *ImportHTTP {
27+
opts := managedOpts
28+
opts.OnStateChange = func(e component.Exports) {
29+
onContentChange(e.(remote_http.Exports).Content.Value)
30+
}
31+
return &ImportHTTP{
32+
managedOpts: opts,
33+
eval: eval,
34+
}
35+
}
36+
37+
// HTTPArguments holds values which are used to configure the remote.http component.
38+
type HTTPArguments struct {
39+
URL string `river:"url,attr"`
40+
PollFrequency time.Duration `river:"poll_frequency,attr,optional"`
41+
PollTimeout time.Duration `river:"poll_timeout,attr,optional"`
42+
43+
Method string `river:"method,attr,optional"`
44+
Headers map[string]string `river:"headers,attr,optional"`
45+
Body string `river:"body,attr,optional"`
46+
47+
Client common_config.HTTPClientConfig `river:"client,block,optional"`
48+
}
49+
50+
// DefaultHTTPArguments holds default settings for HTTPArguments.
51+
var DefaultHTTPArguments = HTTPArguments{
52+
PollFrequency: 1 * time.Minute,
53+
PollTimeout: 10 * time.Second,
54+
Client: common_config.DefaultHTTPClientConfig,
55+
Method: http.MethodGet,
56+
}
57+
58+
// SetToDefault implements river.Defaulter.
59+
func (args *HTTPArguments) SetToDefault() {
60+
*args = DefaultHTTPArguments
61+
}
62+
63+
func (im *ImportHTTP) Evaluate(scope *vm.Scope) error {
64+
var arguments HTTPArguments
65+
if err := im.eval.Evaluate(scope, &arguments); err != nil {
66+
return fmt.Errorf("decoding River: %w", err)
67+
}
68+
if im.managedRemoteHTTP == nil {
69+
var err error
70+
im.managedRemoteHTTP, err = remote_http.New(im.managedOpts, remote_http.Arguments{
71+
URL: arguments.URL,
72+
PollFrequency: arguments.PollFrequency,
73+
PollTimeout: arguments.PollTimeout,
74+
Method: arguments.Method,
75+
Headers: arguments.Headers,
76+
Body: arguments.Body,
77+
Client: arguments.Client,
78+
})
79+
if err != nil {
80+
return fmt.Errorf("creating http component: %w", err)
81+
}
82+
im.arguments = arguments
83+
}
84+
85+
if reflect.DeepEqual(im.arguments, arguments) {
86+
return nil
87+
}
88+
89+
// Update the existing managed component
90+
if err := im.managedRemoteHTTP.Update(arguments); err != nil {
91+
return fmt.Errorf("updating component: %w", err)
92+
}
93+
im.arguments = arguments
94+
return nil
95+
}
96+
97+
func (im *ImportHTTP) Run(ctx context.Context) error {
98+
return im.managedRemoteHTTP.Run(ctx)
99+
}
100+
101+
func (im *ImportHTTP) CurrentHealth() component.Health {
102+
return im.managedRemoteHTTP.CurrentHealth()
103+
}

pkg/flow/internal/importsource/import_source.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ type SourceType int
1313
const (
1414
File SourceType = iota
1515
String
16+
HTTP
1617
)
1718

1819
const (
1920
BlockImportFile = "import.file"
2021
BlockImportString = "import.string"
22+
BlockImportHTTP = "import.http"
2123
)
2224

2325
// ImportSource retrieves a module from a source.
@@ -38,6 +40,8 @@ func NewImportSource(sourceType SourceType, managedOpts component.Options, eval
3840
return NewImportFile(managedOpts, eval, onContentChange)
3941
case String:
4042
return NewImportString(eval, onContentChange)
43+
case HTTP:
44+
return NewImportHTTP(managedOpts, eval, onContentChange)
4145
}
4246
panic(fmt.Errorf("unsupported source type: %v", sourceType))
4347
}
@@ -49,6 +53,8 @@ func GetSourceType(fullName string) SourceType {
4953
return File
5054
case BlockImportString:
5155
return String
56+
case BlockImportHTTP:
57+
return HTTP
5258
}
5359
panic(fmt.Errorf("name does not map to a known source type: %v", fullName))
5460
}

pkg/flow/module_eval_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,5 +259,6 @@ func verifyNoGoroutineLeaks(t *testing.T) {
259259
t,
260260
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
261261
goleak.IgnoreTopFunction("go.opentelemetry.io/otel/sdk/trace.(*batchSpanProcessor).processQueue"),
262+
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), // related to TCP keep alive
262263
)
263264
}

pkg/flow/source.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func sourceFromBody(body ast.Body) (*Source, error) {
7575
switch fullName {
7676
case "declare":
7777
declares = append(declares, stmt)
78-
case "logging", "tracing", "argument", "export", "import.file", "import.string":
78+
case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http":
7979
configs = append(configs, stmt)
8080
default:
8181
components = append(components, stmt)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Import passthrough module.
2+
3+
-- main.river --
4+
testcomponents.count "inc" {
5+
frequency = "10ms"
6+
max = 10
7+
}
8+
9+
import.http "testImport" {
10+
url = "https://raw.githubusercontent.com/wildum/module/master/module_passthrough.river"
11+
}
12+
13+
testImport.a "cc" {
14+
input = testcomponents.count.inc.count
15+
}
16+
17+
testcomponents.summation "sum" {
18+
input = testImport.a.cc.output
19+
}

0 commit comments

Comments
 (0)