@@ -543,33 +543,39 @@ let stream_to_fd ?(byte_size = Int.max_int) fd stream name =
543
543
let open Lwt.Infix in
544
544
let exception Malformed_data of string in
545
545
Lwt. catch (fun () ->
546
- let rec read_more size =
546
+ let rec read_more fd stream size =
547
547
Lwt_stream. get stream >>= function
548
548
| None -> Lwt. return (Ok () )
549
549
| Some `Data data ->
550
550
if String. length data > byte_size - size then begin
551
551
Logs. err (fun m -> m "stream exceeds size" );
552
552
Lwt. return (Error (`Msg "stream exceeds size" ))
553
553
end else
554
- let rec write off =
555
- Lwt_unix. write fd ( Bytes. unsafe_of_string data ) off
556
- ( String. length data - off ) >>= fun written ->
557
- if written = String. length data - off then
558
- read_more (size + String. length data )
554
+ let rec write fd data off len =
555
+ let to_write = len - off in
556
+ Lwt_unix. write fd data off to_write >>= fun written ->
557
+ if written = to_write then
558
+ read_more fd stream (size + len )
559
559
else
560
- write (off + written )
560
+ write fd data (off + written ) len
561
561
in
562
- write 0
562
+ write fd ( Bytes. unsafe_of_string data ) 0 ( String. length data )
563
563
| Some `Malformed msg ->
564
564
raise (Malformed_data msg )
565
565
in
566
- read_more 0 >>= fun r ->
566
+ read_more fd stream 0 >>= fun r ->
567
567
safe_close fd >|= fun () ->
568
568
r )
569
569
(fun e ->
570
570
Logs. err (fun m -> m "error writing %a: %s"
571
571
Fpath. pp name (Printexc. to_string e ));
572
- safe_close fd >|= fun () ->
572
+ safe_close fd >>= fun () ->
573
+ let rec drop_stream s =
574
+ Lwt_stream. get s >>= function
575
+ | None -> Lwt. return_unit
576
+ | Some _ -> drop_stream s
577
+ in
578
+ drop_stream stream >|= fun () ->
573
579
Error (`Msg "error writing to file descriptor" ))
574
580
575
581
let stream_to_block ~size ~byte_size stream name =
0 commit comments