Skip to content

Commit 22e51c0

Browse files
committed
CQ: Retry opening write file when flushing buffers
On Windows the file may be in "DELETE PENDING" state following its deletion (when the last message was acked). A subsequent message leads us to writing to that file again but we can't and get an {error,eacces}. In that case we wait 10ms and retry up to 3 times.
1 parent e019a4e commit 22e51c0

File tree

1 file changed

+25
-5
lines changed

1 file changed

+25
-5
lines changed

deps/rabbit/src/rabbit_classic_queue_store_v2.erl

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,25 @@ maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) ->
194194
false -> State
195195
end.
196196

197+
open_eventually(File, Modes) ->
198+
open_eventually(File, Modes, 3).
199+
200+
open_eventually(_, _, 0) ->
201+
{error, eacces};
202+
open_eventually(File, Modes, N) ->
203+
case file:open(File, Modes) of
204+
OK = {ok, _} ->
205+
OK;
206+
%% When the current write file was recently deleted it
207+
%% is possible on Windows to get an {error,eacces}.
208+
%% Sometimes Windows sets the files to "DELETE PENDING"
209+
%% state and delays deletion a bit. So we wait 10ms and
210+
%% try again up to 3 times.
211+
{error, eacces} ->
212+
timer:sleep(10),
213+
open_eventually(File, Modes, N - 1)
214+
end.
215+
197216
flush_buffer(State = #qs{ write_buffer_size = 0 }, _) ->
198217
State;
199218
flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
@@ -204,7 +223,7 @@ flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
204223
Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount),
205224
%% Then we do the writes for each segment.
206225
State = lists:foldl(fun({Segment, LocBytes}, FoldState) ->
207-
{ok, Fd} = file:open(segment_file(Segment, FoldState), [read, write, raw, binary]),
226+
{ok, Fd} = open_eventually(segment_file(Segment, FoldState), [read, write, raw, binary]),
208227
case file:position(Fd, eof) of
209228
{ok, 0} ->
210229
%% We write the file header if it does not exist.
@@ -509,10 +528,11 @@ remove(SeqId, State = #qs{ write_buffer = WriteBuffer0,
509528
delete_segments([], State) ->
510529
?DEBUG("[] ~0p", [State]),
511530
State;
512-
delete_segments(Segments, State0 = #qs{ write_buffer = WriteBuffer0,
513-
write_buffer_size = WriteBufferSize0,
514-
read_segment = ReadSegment,
515-
read_fd = ReadFd }) ->
531+
delete_segments(Segments, State0 = #qs{ write_segment = WriteSegment,
532+
write_buffer = WriteBuffer0,
533+
write_buffer_size = WriteBufferSize0,
534+
read_segment = ReadSegment,
535+
read_fd = ReadFd }) ->
516536
?DEBUG("~0p ~0p", [Segments, State0]),
517537
%% First we have to close fds for the segments, if any.
518538
%% 'undefined' is never in Segments so we don't

0 commit comments

Comments
 (0)