@@ -75,6 +75,7 @@ func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, in
75
75
if ! integrations {
76
76
return fmt .Errorf ("failed to wait for apm server: %w" , err )
77
77
}
78
+ g .logger .Info ("re-apply apm policy" )
78
79
if err = g .reapplyAPMPolicy (ctx , version ); err != nil {
79
80
return fmt .Errorf ("failed to re-apply apm policy: %w" , err )
80
81
}
@@ -84,21 +85,12 @@ func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, in
84
85
}
85
86
86
87
g .logger .Info ("ingest data" )
87
- if err := g .runBlocking (ctx , version ); err != nil {
88
+ if err := g .retryRunBlocking (ctx , version , 2 ); err != nil {
88
89
return fmt .Errorf ("cannot run generator: %w" , err )
89
90
}
90
91
91
- // With Fleet managed APM server, we can trigger metrics flush.
92
- if integrations {
93
- g .logger .Info ("flush apm metrics" )
94
- if err := g .flushAPMMetrics (ctx , version ); err != nil {
95
- return fmt .Errorf ("cannot flush apm metrics: %w" , err )
96
- }
97
- return nil
98
- }
99
-
100
- // With standalone, we don't have Fleet, so simply just wait for some arbitrary time.
101
- time .Sleep (180 * time .Second )
92
+ // Simply wait for some arbitrary time, for the data to be flushed.
93
+ time .Sleep (200 * time .Second )
102
94
return nil
103
95
}
104
96
@@ -159,9 +151,35 @@ func (g *Generator) runBlocking(ctx context.Context, version ech.Version) error
159
151
return gen .RunBlocking (ctx )
160
152
}
161
153
154
+ // retryRunBlocking executes runBlocking. If it fails, it will retry up to retryTimes.
155
+ func (g * Generator ) retryRunBlocking (ctx context.Context , version ech.Version , retryTimes int ) error {
156
+ // No error, don't need to retry.
157
+ if err := g .runBlocking (ctx , version ); err == nil {
158
+ return nil
159
+ }
160
+
161
+ // Otherwise, retry until success or run out of attempts.
162
+ var finalErr error
163
+ for i := 0 ; i < retryTimes ; i ++ {
164
+ // Wait for some time before retrying.
165
+ time .Sleep (time .Duration (i ) * 30 * time .Second )
166
+
167
+ g .logger .Info (fmt .Sprintf ("retrying ingest data attempt %d" , i + 1 ))
168
+ err := g .runBlocking (ctx , version )
169
+ // Retry success, simply return.
170
+ if err == nil {
171
+ return nil
172
+ }
173
+
174
+ finalErr = err
175
+ }
176
+
177
+ return finalErr
178
+ }
179
+
162
180
func (g * Generator ) reapplyAPMPolicy (ctx context.Context , version ech.Version ) error {
163
181
policyID := "elastic-cloud-apm"
164
- description := fmt .Sprintf ("%s %s" , version , rand .Text ()[5 : ])
182
+ description := fmt .Sprintf ("%s %s" , version , rand .Text ()[: 10 ])
165
183
166
184
if err := g .kbc .UpdatePackagePolicyDescriptionByID (ctx , policyID , version , description ); err != nil {
167
185
return fmt .Errorf (
@@ -173,22 +191,6 @@ func (g *Generator) reapplyAPMPolicy(ctx context.Context, version ech.Version) e
173
191
return nil
174
192
}
175
193
176
- // flushAPMMetrics sends an update to the Fleet APM package policy in order
177
- // to trigger the flushing of in-flight APM metrics.
178
- func (g * Generator ) flushAPMMetrics (ctx context.Context , version ech.Version ) error {
179
- // Re-applying the Elastic APM policy is enough to trigger final aggregations
180
- // in APM Server and flush of in-flight metrics.
181
- if err := g .reapplyAPMPolicy (ctx , version ); err != nil {
182
- return err
183
- }
184
-
185
- // APM Server needs some time to flush all metrics, and we don't have any
186
- // visibility on when this completes.
187
- // NOTE: This value comes from empirical observations.
188
- time .Sleep (120 * time .Second )
189
- return nil
190
- }
191
-
192
194
type apmInfoResp struct {
193
195
Version string `json:"version"`
194
196
PublishReady bool `json:"publish_ready"`
0 commit comments