Skip to content

Commit c2fd3bd

Browse files
committed
conats: completely migrate the hub-api over to use it... and it just works, first try!?
- wow.
1 parent efc0690 commit c2fd3bd

File tree

6 files changed

+83
-133
lines changed

6 files changed

+83
-133
lines changed

src/packages/frontend/nats/client.ts

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ import {
5555
} from "@cocalc/nats/client";
5656
import type { ConnectionInfo } from "./types";
5757
import { fromJS } from "immutable";
58-
import { requestMany } from "@cocalc/nats/service/many";
5958
import Cookies from "js-cookie";
6059
import { ACCOUNT_ID_COOKIE } from "@cocalc/frontend/client/client";
6160
import { isConnected, waitUntilConnected } from "@cocalc/nats/util";
@@ -271,31 +270,18 @@ export class NatsClient extends EventEmitter {
271270
name,
272271
args = [],
273272
timeout = DEFAULT_TIMEOUT,
274-
requestMany: requestMany0 = false,
275273
}: {
276274
service?: string;
277275
name: string;
278276
args: any[];
279277
timeout?: number;
280-
// requestMany -- if true do a requestMany request, which is more complicated/slower, but
281-
// supports arbitrarily large responses irregardless of the nats server max message size.
282-
requestMany?: boolean;
283278
}) => {
284-
const { nc } = await this.getEnv();
279+
const { cn } = await this.getEnv();
285280
const subject = `hub.account.${this.client.account_id}.${service}`;
286281
try {
287-
const data = this.jc.encode({
288-
name,
289-
args,
290-
});
291-
let resp;
292-
await waitUntilConnected();
293-
if (requestMany0) {
294-
resp = await requestMany({ nc, subject, data, maxWait: timeout });
295-
} else {
296-
resp = await nc.request(subject, data, { timeout });
297-
}
298-
return this.jc.decode(resp.data);
282+
const data = { name, args };
283+
const resp = await cn.request(subject, data, { timeout });
284+
return resp.data;
299285
} catch (err) {
300286
err.message = `${err.message} - callHub: subject='${subject}', name='${name}', `;
301287
throw err;

src/packages/nats/hub-api/db.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ export interface DB {
3333
getLegacyTimeTravelPatches: (opts: {
3434
account_id?: string;
3535
uuid: string;
36-
// you should set this to true to enable potentially very large response support
37-
requestMany?: boolean;
3836
// also, make this bigger:
3937
timeout?: number;
4038
}) => Promise<string>;

src/packages/nats/hub-api/index.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,3 @@
1-
/*
2-
NOTE: If you need to send *very large responses* to a message or increase timeouts,
3-
see getLegacyTimeTravelPatches in db.ts. You just have to allow the keys requestMany
4-
and timeout to the *first* argument of the function (which must be an object).
5-
The framework will then automatically allow large responses when the user sets
6-
requestMany:true.
7-
*/
8-
91
import { isValidUUID } from "@cocalc/util/misc";
102
import { type Purchases, purchases } from "./purchases";
113
import { type System, system } from "./system";
@@ -47,7 +39,6 @@ export function initHubApi(callHubApi): HubApi {
4739
const resp = await callHubApi({
4840
name: `${group}.${functionName}`,
4941
args,
50-
requestMany: args[0]?.requestMany,
5142
timeout: args[0]?.timeout,
5243
});
5344
return handleErrorMessage(resp);

src/packages/nats/server/client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,12 +551,12 @@ export class Message {
551551
this.subject = subject;
552552
}
553553

554-
respond = (data: any) => {
554+
respond = async (data: any) => {
555555
const subject = this.headers?.[REPLY_HEADER];
556556
if (!subject) {
557557
throw Error("message is not a request");
558558
}
559-
this.client.publish(subject, data);
559+
await this.client.publish(subject, data);
560560
};
561561
}
562562

src/packages/nats/server/server.ts

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ For clustering:
1414
s0 = await require('@cocalc/server/nats/socketio').initConatServer({valkey:'redis://localhost:6379', port:3000})
1515
1616
s1 = await require('@cocalc/server/nats/socketio').initConatServer({valkey:'redis://localhost:6379', port:3001})
17+
18+
Corresponding clients:
19+
20+
c0 = require('@cocalc/nats/server/client').connect('http://localhost:3000')
21+
22+
c1 = require('@cocalc/nats/server/client').connect('http://localhost:3001')
23+
24+
---
1725
1826
Or from cocalc/src
1927
@@ -41,7 +49,14 @@ const MAX_PAYLOAD = 1 * MB;
4149

4250
const MAX_DISCONNECTION_DURATION = 2 * 60 * 1000;
4351

44-
const DEBUG = true;
52+
const DEBUG = false;
53+
54+
interface InterestUpdate {
55+
op: "add" | "delete";
56+
subject: string;
57+
queue?: string;
58+
room: string;
59+
}
4560

4661
export function init(opts) {
4762
return new ConatServer(opts);
@@ -167,15 +182,15 @@ export class ConatServer {
167182
let lastId = "0";
168183
let d = 50;
169184
while (true) {
170-
console.log("waiting for interest update");
185+
// console.log("waiting for interest update");
171186
const results = await this.valkey.sub.xread(
172187
"block" as any,
173188
0,
174189
"STREAMS",
175190
"interest",
176191
lastId,
177192
);
178-
console.log("got ", results);
193+
// console.log("got ", results);
179194
if (results == null) {
180195
d = Math.min(1000, d * 1.2);
181196
await delay(d);
@@ -189,7 +204,7 @@ export class ConatServer {
189204
this._updateInterest(update);
190205
}
191206
lastId = messages[messages.length - 1][0];
192-
console.log({ lastId });
207+
// console.log({ lastId });
193208
}
194209
};
195210

@@ -344,29 +359,22 @@ export class ConatServer {
344359
socket.on("disconnecting", async () => {
345360
const rooms = Array.from(socket.rooms) as string[];
346361
const d = this.options.maxDisconnectionDuration ?? 0;
347-
console.log(`will unsubscribe in ${d}ms unless client reconnects`);
362+
// console.log(`will unsubscribe in ${d}ms unless client reconnects`);
348363
await delay(d);
349364
if (!this.io.of("/").adapter.sids.has(id)) {
350-
console.log("client not back");
365+
// console.log("client not back");
351366
for (const room of rooms) {
352367
const subject = getSubjectFromRoom(room);
353368
this.unsubscribe({ socket, subject });
354369
}
355370
delete this.subscriptions[id];
356371
} else {
357-
console.log("client is back!");
372+
// console.log("client is back!");
358373
}
359374
});
360375
};
361376
}
362377

363-
interface InterestUpdate {
364-
op: "add" | "delete";
365-
subject: string;
366-
queue?: string;
367-
room: string;
368-
}
369-
370378
function getSubjectFromRoom(room: string) {
371379
if (room.startsWith("{")) {
372380
return JSON.parse(room).subject;

src/packages/server/nats/api/index.ts

Lines changed: 55 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ your dev hub after doing the above.
1212
1313
2. Run this script at the terminal:
1414
15-
echo "require('@cocalc/server/nats').default()" | COCALC_MODE='single-user' DEBUG_CONSOLE=yes DEBUG=cocalc:* node
15+
echo "require('@cocalc/server/nats/api').initAPI()" | COCALC_MODE='single-user' DEBUG_CONSOLE=yes DEBUG=cocalc:* node
1616
1717
1818
3. Optional: start more servers -- requests get randomly routed to exactly one of them:
@@ -40,26 +40,18 @@ To view all requests (and replies) in realtime:
4040
And remember to use the nats command, do "pnpm nats-cli" from cocalc/src.
4141
*/
4242

43-
import { JSONCodec } from "nats";
4443
import getLogger from "@cocalc/backend/logger";
4544
import { type HubApi, getUserId, transformArgs } from "@cocalc/nats/hub-api";
46-
import { getConnection } from "@cocalc/backend/nats";
45+
import { getEnv } from "@cocalc/backend/nats";
4746
import userIsInGroup from "@cocalc/server/accounts/is-in-group";
4847
import { terminate as terminateDatabase } from "@cocalc/database/nats/changefeeds";
4948
import { terminate as terminateChangefeedServer } from "@cocalc/nats/changefeed/server";
50-
import { Svcm } from "@nats-io/services";
5149
import { terminate as terminateAuth } from "@cocalc/server/nats/auth";
5250
import { terminate as terminateTieredStorage } from "@cocalc/server/nats/tiered-storage/api";
53-
import { respondMany } from "@cocalc/nats/service/many";
5451
import { delay } from "awaiting";
55-
import { waitUntilConnected } from "@cocalc/nats/util";
56-
57-
const MONITOR_INTERVAL = 30000;
5852

5953
const logger = getLogger("server:nats:api");
6054

61-
const jc = JSONCodec();
62-
6355
export function initAPI() {
6456
mainLoop();
6557
}
@@ -88,93 +80,68 @@ async function mainLoop() {
8880
}
8981
}
9082

91-
async function serviceMonitor({ nc, api, subject }) {
92-
while (!terminate) {
93-
logger.debug(`serviceMonitor: waiting ${MONITOR_INTERVAL}ms`);
94-
await delay(MONITOR_INTERVAL);
95-
try {
96-
await waitUntilConnected();
97-
await nc.request(subject, jc.encode({ name: "ping" }), {
98-
timeout: 7500,
99-
});
100-
logger.debug("serviceMonitor: ping succeeded");
101-
} catch (err) {
102-
logger.debug(
103-
`serviceMonitor: ping failed, so restarting service -- ${err}`,
104-
);
105-
api.stop();
106-
return;
107-
}
108-
}
109-
}
110-
11183
async function serve() {
11284
const subject = "hub.*.*.api";
11385
logger.debug(`initAPI -- subject='${subject}', options=`, {
11486
queue: "0",
11587
});
116-
const nc = await getConnection();
117-
// @ts-ignore
118-
const svcm = new Svcm(nc);
119-
120-
await waitUntilConnected();
121-
const service = await svcm.add({
122-
name: "hub-server",
123-
version: "0.1.0",
124-
description: "CoCalc Hub Server",
125-
});
126-
127-
const api = service.addEndpoint("api", { subject });
128-
serviceMonitor({ api, subject, nc });
129-
await listen({ api, subject });
88+
const { cn } = await getEnv();
89+
const api = await cn.subscribe(subject);
90+
for await (const mesg of api) {
91+
(async () => {
92+
try {
93+
await handleMessage({ api, subject, mesg });
94+
} catch (err) {
95+
logger.debug(`WARNING: unexpected error - ${err}`);
96+
}
97+
})();
98+
}
13099
}
131100

132-
async function listen({ api, subject }) {
133-
for await (const mesg of api) {
134-
const request = jc.decode(mesg.data) ?? ({} as any);
135-
if (request.name == "system.terminate") {
101+
async function handleMessage({ api, subject, mesg }) {
102+
const request = mesg.data ?? ({} as any);
103+
if (request.name == "system.terminate") {
104+
// special hook so admin can terminate handling. This is useful for development.
105+
const { account_id } = getUserId(mesg.subject);
106+
if (!(!!account_id && (await userIsInGroup(account_id, "admin")))) {
107+
mesg.respond({ error: "only admin can terminate" });
108+
return;
109+
}
110+
// TODO: could be part of handleApiRequest below, but done differently because
111+
// one case halts this loop
112+
const { service } = request.args[0] ?? {};
113+
logger.debug(`Terminate service '${service}'`);
114+
if (service == "db") {
115+
terminateDatabase();
116+
mesg.respond({ status: "terminated", service });
117+
return;
118+
} else if (service == "auth") {
119+
terminateAuth();
120+
mesg.respond({ status: "terminated", service });
121+
return;
122+
} else if (service == "tiered-storage") {
123+
terminateTieredStorage();
124+
mesg.respond({ status: "terminated", service });
125+
return;
126+
} else if (service == "changefeeds") {
127+
terminateChangefeedServer();
128+
mesg.respond({ status: "terminated", service });
129+
return;
130+
} else if (service == "api") {
136131
// special hook so admin can terminate handling. This is useful for development.
137-
const { account_id } = getUserId(mesg.subject);
138-
if (!(!!account_id && (await userIsInGroup(account_id, "admin")))) {
139-
mesg.respond(jc.encode({ error: "only admin can terminate" }));
140-
continue;
141-
}
142-
// TODO: could be part of handleApiRequest below, but done differently because
143-
// one case halts this loop
144-
const { service } = request.args[0] ?? {};
145-
logger.debug(`Terminate service '${service}'`);
146-
if (service == "db") {
147-
terminateDatabase();
148-
mesg.respond(jc.encode({ status: "terminated", service }));
149-
continue;
150-
} else if (service == "auth") {
151-
terminateAuth();
152-
mesg.respond(jc.encode({ status: "terminated", service }));
153-
continue;
154-
} else if (service == "tiered-storage") {
155-
terminateTieredStorage();
156-
mesg.respond(jc.encode({ status: "terminated", service }));
157-
continue;
158-
} else if (service == "changefeeds") {
159-
terminateChangefeedServer();
160-
mesg.respond(jc.encode({ status: "terminated", service }));
161-
continue;
162-
} else if (service == "api") {
163-
// special hook so admin can terminate handling. This is useful for development.
164-
console.warn("TERMINATING listening on ", subject);
165-
logger.debug("TERMINATING listening on ", subject);
166-
terminate = true;
167-
mesg.respond(jc.encode({ status: "terminated", service }));
168-
api.stop();
169-
return;
170-
} else {
171-
mesg.respond(jc.encode({ error: `Unknown service ${service}` }));
172-
}
132+
console.warn("TERMINATING listening on ", subject);
133+
logger.debug("TERMINATING listening on ", subject);
134+
terminate = true;
135+
mesg.respond({ status: "terminated", service });
136+
api.stop();
137+
return;
173138
} else {
174-
// we explicitly do NOT await this, since we want this hub server to handle
175-
// potentially many messages at once, not one at a time!
176-
handleApiRequest({ request, mesg });
139+
mesg.respond({ error: `Unknown service ${service}` });
177140
}
141+
} else {
142+
// we explicitly do NOT await this, since we want this hub server to handle
143+
// potentially many messages at once, not one at a time!
144+
handleApiRequest({ request, mesg });
178145
}
179146
}
180147

@@ -193,7 +160,7 @@ async function handleApiRequest({ request, mesg }) {
193160
resp = { error: `${err}` };
194161
}
195162
try {
196-
await respondMany({ mesg, data: jc.encode(resp) });
163+
await mesg.respond(resp);
197164
} catch (err) {
198165
// there's nothing we can do here, e.g., maybe NATS just died.
199166
logger.debug(

0 commit comments

Comments
 (0)