Skip to content

Commit 06762fd

Browse files
authored
DX-1829: Add period and rate to flow control and deprecate ratePerSecond (#101)
* feat: add flow control * fix: tests * fix: fmt * fix: mock qstash methods of client in test
1 parent 481f758 commit 06762fd

10 files changed

+45
-23
lines changed

bun.lockb

55 Bytes
Binary file not shown.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@
103103
},
104104
"dependencies": {
105105
"@ai-sdk/openai": "^1.2.1",
106-
"@upstash/qstash": "^2.7.22",
106+
"@upstash/qstash": "^2.8.1",
107107
"ai": "^4.1.54",
108108
"zod": "^3.24.1"
109109
},

src/agents/adapters.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ describe("wrapTools", () => {
1515
new WorkflowContext({
1616
headers: new Headers({}) as Headers,
1717
initialPayload: "mock",
18-
qstashClient: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }),
18+
qstashClient: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token, enableTelemetry: false }),
1919
steps: [],
2020
url: WORKFLOW_ENDPOINT,
2121
workflowRunId,

src/context/auto-executor.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ describe("auto-executor", () => {
8585

8686
const getContext = (steps: Step[]) => {
8787
return new SpyWorkflowContext({
88-
qstashClient: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }),
88+
qstashClient: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token, enableTelemetry: false }),
8989
workflowRunId,
9090
initialPayload,
9191
headers: new Headers({}) as Headers,

src/context/context.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ import { upstash } from "@upstash/qstash";
1616

1717
describe("context tests", () => {
1818
const token = nanoid();
19-
const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });
19+
const qstashClient = new Client({
20+
baseUrl: MOCK_QSTASH_SERVER_URL,
21+
token,
22+
enableTelemetry: false,
23+
});
2024
test("should raise when there are nested steps (with run)", () => {
2125
const context = new WorkflowContext({
2226
qstashClient,

src/qstash/headers.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,14 @@ function addPrefixToHeaders(headers: Record<string, string>, prefix: string) {
251251

252252
export const prepareFlowControl = (flowControl: FlowControl) => {
253253
const parallelism = flowControl.parallelism?.toString();
254-
const rate = flowControl.ratePerSecond?.toString();
254+
const rate = (flowControl.rate ?? flowControl.ratePerSecond)?.toString();
255+
const period =
256+
typeof flowControl.period === "number" ? `${flowControl.period}s` : flowControl.period;
255257

256258
const controlValue = [
257259
parallelism ? `parallelism=${parallelism}` : undefined,
258260
rate ? `rate=${rate}` : undefined,
261+
period ? `period=${period}` : undefined,
259262
].filter(Boolean);
260263

261264
if (controlValue.length === 0) {

src/serve/authorization.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import { DisabledWorkflowContext } from "./authorization";
1010

1111
describe("disabled workflow context", () => {
1212
const token = nanoid();
13-
const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });
13+
const qstashClient = new Client({
14+
baseUrl: MOCK_QSTASH_SERVER_URL,
15+
token,
16+
enableTelemetry: false,
17+
});
1418
const disabledContext = new DisabledWorkflowContext({
1519
qstashClient,
1620
workflowRunId: "wfr-foo",

src/serve/serve-many.test.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ describe("serveMany", () => {
4141

4242
describe("serve tests", () => {
4343
const token = nanoid();
44-
const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });
44+
const qstashClient = new Client({
45+
baseUrl: MOCK_QSTASH_SERVER_URL,
46+
token,
47+
enableTelemetry: false,
48+
});
4549

4650
const workflowOne = createWorkflow(
4751
async (context: WorkflowContext<number>) => {
@@ -160,7 +164,7 @@ describe("serveMany", () => {
160164
"Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"],
161165
"Upstash-Telemetry-Framework": ["nextjs"],
162166
"Upstash-Telemetry-Runtime": ["[email protected]"],
163-
"Upstash-Telemetry-Sdk": ["@upstash/[email protected].7"],
167+
"Upstash-Telemetry-Sdk": ["@upstash/[email protected].13"],
164168
"Upstash-Workflow-Init": ["false"],
165169
"Upstash-Workflow-RunId": ["wfr_id"],
166170
"Upstash-Workflow-Runid": ["wfr_id"],
@@ -215,7 +219,7 @@ describe("serveMany", () => {
215219
"Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"],
216220
"Upstash-Telemetry-Framework": ["nextjs"],
217221
"Upstash-Telemetry-Runtime": ["[email protected]"],
218-
"Upstash-Telemetry-Sdk": ["@upstash/[email protected].7"],
222+
"Upstash-Telemetry-Sdk": ["@upstash/[email protected].13"],
219223
"Upstash-Workflow-Init": ["false"],
220224
"Upstash-Workflow-RunId": ["wfr_id"],
221225
"Upstash-Workflow-Runid": ["wfr_id"],
@@ -289,7 +293,7 @@ describe("serveMany", () => {
289293
"upstash-retries": "0",
290294
"upstash-telemetry-framework": "nextjs",
291295
"upstash-telemetry-runtime": "[email protected]",
292-
"upstash-telemetry-sdk": "@upstash/[email protected].7",
296+
"upstash-telemetry-sdk": "@upstash/[email protected].13",
293297
"upstash-workflow-calltype": "toCallback",
294298
"upstash-workflow-init": "false",
295299
"upstash-workflow-runid": "wfr_id",

src/serve/serve.test.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* eslint-disable @typescript-eslint/no-unused-vars */
22
/* eslint-disable @typescript-eslint/require-await */
3-
import { describe, expect, spyOn, test } from "bun:test";
3+
import { describe, expect, jest, spyOn, test } from "bun:test";
44
import { serve } from ".";
55
import {
66
driveWorkflow,
@@ -32,7 +32,7 @@ const someWork = (input: string) => {
3232
const workflowRunId = `wfr${nanoid()}`;
3333
const token = nanoid();
3434

35-
const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });
35+
const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token, enableTelemetry: false });
3636

3737
describe("serve", () => {
3838
test("should send create workflow request in initial request", async () => {
@@ -409,7 +409,7 @@ describe("serve", () => {
409409
"upstash-workflow-url": WORKFLOW_ENDPOINT,
410410
"upstash-telemetry-framework": "unknown",
411411
"upstash-telemetry-runtime": "unknown",
412-
"upstash-telemetry-sdk": "@upstash/[email protected].7",
412+
"upstash-telemetry-sdk": "@upstash/[email protected].13",
413413
},
414414
body: '{"stepId":3,"stepName":"step 3","stepType":"Run","out":"\\"combined results: result 1,result 2\\"","concurrent":1}',
415415
},
@@ -458,7 +458,7 @@ describe("serve", () => {
458458
"upstash-workflow-url": WORKFLOW_ENDPOINT,
459459
"upstash-telemetry-framework": "unknown",
460460
"upstash-telemetry-runtime": "unknown",
461-
"upstash-telemetry-sdk": "@upstash/[email protected].7",
461+
"upstash-telemetry-sdk": "@upstash/[email protected].13",
462462
},
463463
body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
464464
},
@@ -511,7 +511,7 @@ describe("serve", () => {
511511
"upstash-failure-callback-workflow-url": "https://requestcatcher.com/api",
512512
"upstash-telemetry-framework": "unknown",
513513
"upstash-telemetry-runtime": "unknown",
514-
"upstash-telemetry-sdk": "@upstash/[email protected].7",
514+
"upstash-telemetry-sdk": "@upstash/[email protected].13",
515515
},
516516
body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
517517
},
@@ -566,7 +566,7 @@ describe("serve", () => {
566566
"upstash-failure-callback-workflow-url": "https://requestcatcher.com/api",
567567
"upstash-telemetry-framework": "unknown",
568568
"upstash-telemetry-runtime": "unknown",
569-
"upstash-telemetry-sdk": "@upstash/[email protected].7",
569+
"upstash-telemetry-sdk": "@upstash/[email protected].13",
570570
},
571571
body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
572572
},
@@ -661,7 +661,7 @@ describe("serve", () => {
661661
"upstash-retries": "4",
662662
"upstash-telemetry-framework": "unknown",
663663
"upstash-telemetry-runtime": "unknown",
664-
"upstash-telemetry-sdk": "@upstash/[email protected].7",
664+
"upstash-telemetry-sdk": "@upstash/[email protected].13",
665665
"upstash-timeout": "10",
666666
"upstash-workflow-calltype": "toCallback",
667667
"upstash-workflow-init": "false",
@@ -875,7 +875,7 @@ describe("serve", () => {
875875
"Upstash-Workflow-Url": [WORKFLOW_ENDPOINT],
876876
"Upstash-Telemetry-Framework": ["unknown"],
877877
"Upstash-Telemetry-Runtime": ["unknown"],
878-
"Upstash-Telemetry-Sdk": ["@upstash/[email protected].7"],
878+
"Upstash-Telemetry-Sdk": ["@upstash/[email protected].13"],
879879
},
880880
timeoutUrl: WORKFLOW_ENDPOINT,
881881
url: WORKFLOW_ENDPOINT,
@@ -1017,6 +1017,10 @@ describe("serve", () => {
10171017

10181018
describe("incorrect url will throw", () => {
10191019
const qstashClient = new Client({ token: process.env.QSTASH_TOKEN! });
1020+
qstashClient.batch = jest
1021+
.fn()
1022+
.mockReturnValue([{ deduplicatedId: false, messageId: "some-message-id" }]);
1023+
qstashClient.publish = jest.fn({ deduplicatedId: false, messageId: "some-message-id" });
10201024
const client = new WorkflowClient({ token: process.env.QSTASH_TOKEN! });
10211025

10221026
test("allow http://", async () => {
@@ -1111,6 +1115,7 @@ describe("serve", () => {
11111115
headers: {
11121116
[header]: headerValue,
11131117
},
1118+
enableTelemetry: false,
11141119
});
11151120

11161121
const { handler: endpoint } = serve(
@@ -1158,7 +1163,7 @@ describe("serve", () => {
11581163
"upstash-forward-test-header": headerValue,
11591164
"upstash-telemetry-framework": "unknown",
11601165
"upstash-telemetry-runtime": "unknown",
1161-
"upstash-telemetry-sdk": "@upstash/[email protected].7",
1166+
"upstash-telemetry-sdk": "@upstash/[email protected].13",
11621167
},
11631168
body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
11641169
},

src/workflow-requests.test.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,9 @@ describe("Workflow Requests", () => {
525525
undefined,
526526
{
527527
key: "call-flow-key",
528-
ratePerSecond: 5,
528+
rate: 5,
529529
parallelism: 6,
530+
period: 30,
530531
}
531532
);
532533
const { headers } = lazyStep.getHeaders({
@@ -539,8 +540,9 @@ describe("Workflow Requests", () => {
539540
initialPayload: undefined,
540541
flowControl: {
541542
key: "regular-flow-key",
542-
ratePerSecond: 3,
543+
rate: 3,
543544
parallelism: 4,
545+
period: "1m",
544546
},
545547
}),
546548
invokeCount: 3,
@@ -572,9 +574,9 @@ describe("Workflow Requests", () => {
572574
"content-type": "application/json",
573575
// flow control:
574576
"Upstash-Callback-Flow-Control-Key": "regular-flow-key",
575-
"Upstash-Callback-Flow-Control-Value": "parallelism=4, rate=3",
577+
"Upstash-Callback-Flow-Control-Value": "parallelism=4, rate=3, period=1m",
576578
"Upstash-Flow-Control-Key": "call-flow-key",
577-
"Upstash-Flow-Control-Value": "parallelism=6, rate=5",
579+
"Upstash-Flow-Control-Value": "parallelism=6, rate=5, period=30s",
578580
});
579581
});
580582

0 commit comments

Comments
 (0)