Skip to content

Commit b714bed

Browse files
committed
Fix progress reporting causing stream data to be consumed prior to fetch
1 parent a854843 commit b714bed

File tree

3 files changed

+39
-16
lines changed

3 files changed

+39
-16
lines changed

src/private/UploadManagerBase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ export abstract class UploadManagerBase<TSource, TInit> implements UploadManager
9292
part: UploadPart,
9393
contentLength: number,
9494
source: TSource,
95-
onProgress: (bytesSentDelta: number) => void,
95+
onProgress: (totalBytesTransferred: number) => void,
9696
addCancellationHandler: AddCancellationHandler
9797
): Promise<PutUploadPartResult>;
9898

src/public/node/ProgressStream.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { Transform, TransformCallback, TransformOptions } from "stream";
2+
3+
interface ProgressStreamOptions extends TransformOptions {
4+
/**
5+
* Optional callback to handle progress updates.
6+
* @param totalBytesTransferred - The total number of bytes transferred so far.
7+
*/
8+
onProgress?: (totalBytesTransferred: number) => void;
9+
}
10+
11+
export class ProgressStream extends Transform {
12+
private readonly onProgress: ((bytesTransferred: number) => void) | undefined;
13+
private totalBytes: number = 0;
14+
15+
constructor(options?: ProgressStreamOptions) {
16+
super(options);
17+
this.onProgress = options?.onProgress;
18+
}
19+
20+
_transform(chunk: Buffer, _encoding: BufferEncoding, callback: TransformCallback): void {
21+
this.totalBytes += chunk.length;
22+
23+
// Invoke the progress callback if provided
24+
if (this.onProgress !== undefined) {
25+
this.onProgress(this.totalBytes);
26+
}
27+
28+
// Pass the chunk along
29+
this.push(chunk);
30+
callback();
31+
}
32+
}

src/public/node/UploadManagerNode.ts

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { AddCancellationHandler } from "../../private/model/AddCancellationHandl
99
import { StreamUtils } from "../../private/StreamUtils";
1010
import { BlobLike, UploadManagerParams, UploadSource } from "../shared/CommonTypes";
1111
import { UploadManagerFetchUtils } from "../../private/UploadManagerFetchUtils";
12+
import { ProgressStream } from "./ProgressStream";
1213

1314
type UploadManagerNodeInit =
1415
| undefined
@@ -96,27 +97,17 @@ export class UploadManager extends UploadManagerBase<UploadSourceProcessedNode,
9697
part: UploadPart,
9798
contentLength: number,
9899
source: UploadSourceProcessedNode,
99-
onProgress: (bytesSentDelta: number) => void,
100+
onProgress: (totalBytesTransferred: number) => void,
100101
addCancellationHandler: AddCancellationHandler
101102
): Promise<{ etag: string | undefined; status: number }> {
102-
// Report progress:
103-
let hasWarned = false;
104-
let bytesSent = 0;
105-
const stream = await this.sliceDataForRequest(source, part);
106-
const streamWithProgress = stream.on("data", (data: Partial<Buffer>) => {
107-
if (data.byteLength !== undefined) {
108-
bytesSent += data.byteLength;
109-
onProgress(bytesSent);
110-
} else if (!hasWarned) {
111-
hasWarned = true;
112-
console.warn("Expected stream to contain buffers, but it did not, so upload progress won't be reported.");
113-
}
114-
});
103+
const inputStream = await this.sliceDataForRequest(source, part);
104+
const progressStream = new ProgressStream({ onProgress });
105+
inputStream.pipe(progressStream);
115106

116107
return await UploadManagerFetchUtils.doPutUploadPart(
117108
this.config,
118109
part,
119-
this.coerceRequestBody(streamWithProgress),
110+
this.coerceRequestBody(progressStream),
120111
contentLength,
121112
addCancellationHandler
122113
);

0 commit comments

Comments
 (0)