Skip to content

Commit 99cad7f

Browse files
committed
fix(sessions): use static session pool
This should prevent nested transactions after checking out the session from the pool.
1 parent 388e168 commit 99cad7f

File tree

3 files changed

+94
-139
lines changed

3 files changed

+94
-139
lines changed

lib/mongo/protocol.ex

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -172,42 +172,16 @@ defmodule Mongo.Protocol do
172172
end
173173

174174
@impl DBConnection
175-
def handle_begin(opts, %{topology_pid: pid} = state) do
176-
with {:ok, session} <- Mongo.start_session(pid, opts),
177-
:ok <- Mongo.Session.start_transaction(session, opts) do
178-
{:ok, :ok, %{state | session: session}}
179-
else
180-
_ ->
181-
{:idle, state}
182-
end
183-
end
175+
def handle_begin(_opts, state), do: {:ok, nil, state}
184176

185177
@impl DBConnection
186-
def handle_commit(_opts, state) do
187-
with false <- is_nil(state.session),
188-
:ok <- Mongo.Session.commit_transaction(state.session),
189-
:ok <- Mongo.Session.end_session(state.session) do
190-
{:ok, :ok, %{state | session: nil}}
191-
else
192-
_ -> {:idle, state}
193-
end
194-
end
178+
def handle_commit(_opts, state), do: {:ok, nil, state}
195179

196180
@impl DBConnection
197-
def handle_rollback(_opts, state) do
198-
with false <- is_nil(state.session),
199-
:ok <- Mongo.Session.abort_transaction(state.session),
200-
:ok <- Mongo.Session.end_session(state.session) do
201-
{:ok, :ok, %{state | session: nil}}
202-
else
203-
_ -> {:idle, state}
204-
end
205-
end
181+
def handle_rollback(_opts, state), do: {:ok, nil, state}
206182

207183
@impl DBConnection
208-
def handle_close(_query, _opts, state) do
209-
{:ok, nil, state}
210-
end
184+
def handle_close(_query, _opts, state), do: {:ok, nil, state}
211185

212186
@impl DBConnection
213187
def handle_deallocate(_query, _cursor, _opts, state) do

lib/mongo/session.ex

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
defmodule Mongo.Session do
2-
@enforce_keys [:id, :pid]
2+
@enforce_keys [:session, :pid]
33
defstruct @enforce_keys ++
44
[
55
:ref,
@@ -9,11 +9,7 @@ defmodule Mongo.Session do
99
operation_time: nil,
1010
causal_consistency: true,
1111
retry_writes: false,
12-
txn: %{
13-
seq: 0,
14-
read_concern: nil,
15-
write_concern: nil
16-
}
12+
active_txn: nil
1713
]
1814

1915
@opaque session :: pid()
@@ -24,8 +20,8 @@ defmodule Mongo.Session do
2420
defmodule Supervisor do
2521
@moduledoc false
2622

27-
def start_child(conn, id, opts) do
28-
DynamicSupervisor.start_child(__MODULE__, {Mongo.Session, {conn, id, opts, self()}})
23+
def start_child(conn, session, opts, parent) do
24+
DynamicSupervisor.start_child(__MODULE__, {Mongo.Session, {conn, session, opts, parent}})
2925
end
3026

3127
def child_spec(_) do
@@ -66,9 +62,9 @@ defmodule Mongo.Session do
6662
"""
6763
@spec end_session(session()) :: :ok
6864
def end_session(pid) do
69-
Mongo.SessionPool.checkin(pid)
70-
71-
:ok
65+
with {:ok, id, txn} <- :gen_statem.call(pid, :end_session) do
66+
Mongo.SessionPool.checkin(id, txn)
67+
end
7268
end
7369

7470
@doc """
@@ -105,25 +101,6 @@ defmodule Mongo.Session do
105101
:gen_statem.call(pid, {:advance_operation_time, timestamp})
106102
end
107103

108-
@doc false
109-
def set_options(pid, opts) do
110-
causal_consistency = Keyword.get(opts, :causal_consistency, true)
111-
read_concern = Keyword.get(opts, :read_concern, %{})
112-
read_preference = Keyword.get(opts, :read_preference)
113-
retry_writes = Keyword.get(opts, :retry_writes, true)
114-
write_concern = Keyword.get(opts, :write_concern)
115-
116-
state = %{
117-
causal_consistency: causal_consistency,
118-
read_concern: read_concern,
119-
read_preference: read_preference,
120-
retry_writes: retry_writes,
121-
write_concern: write_concern
122-
}
123-
124-
:gen_statem.call(pid, {:set_options, state})
125-
end
126-
127104
@doc false
128105
def update_session(doc, nil), do: doc
129106

@@ -151,15 +128,15 @@ defmodule Mongo.Session do
151128
@outside_txn @states -- @in_txn
152129

153130
@doc false
154-
def child_spec({topology_pid, id, opts, parent}) do
131+
def child_spec({topology_pid, session, opts, parent}) do
155132
causal_consistency = Keyword.get(opts, :causal_consistency, true)
156133
read_concern = Keyword.get(opts, :read_concern, %{})
157134
read_preference = Keyword.get(opts, :read_preference)
158135
retry_writes = Keyword.get(opts, :retry_writes, true)
159136
write_concern = Keyword.get(opts, :write_concern)
160137

161138
state = %__MODULE__{
162-
id: id,
139+
session: session,
163140
pid: topology_pid,
164141
causal_consistency: causal_consistency,
165142
read_concern: read_concern,
@@ -200,25 +177,27 @@ defmodule Mongo.Session do
200177
end
201178

202179
# Start new transaction if there isn't one already.
203-
def handle_event({:call, from}, {:start_transaction, opts}, state, %{txn: %{seq: seq}} = data)
180+
def handle_event({:call, from}, {:start_transaction, opts}, state, %{session: session} = data)
204181
when state in @outside_txn do
205182
write_concern = Keyword.get(opts, :write_concern, data.write_concern)
206183
read_concern = Keyword.get(opts, :read_concern, data.read_concern)
207184

208185
txn = %{
209-
seq: seq + 1,
210186
write_concern: write_concern,
211187
read_concern: read_concern
212188
}
213189

214-
{:next_state, :transaction_started, struct(data, txn: txn), {:reply, from, :ok}}
190+
session = Map.update!(session, :txn, & &1 + 1)
191+
192+
{:next_state, :transaction_started, struct(data, session: session, active_txn: txn),
193+
{:reply, from, :ok}}
215194
end
216195

217196
# Add session information to the query metadata.
218197
def handle_event({:call, from}, {:add_session, query}, :transaction_started, data) do
219198
%{
220-
txn: %{
221-
seq: seq,
199+
session: %{txn: seq, id: id} = session,
200+
active_txn: %{
222201
read_concern: read_concern,
223202
write_concern: write_concern
224203
}
@@ -227,33 +206,39 @@ defmodule Mongo.Session do
227206
new_query =
228207
query
229208
|> Keyword.new()
230-
|> add_option(:lsid, data.id)
209+
|> add_option(:lsid, %{id: id})
231210
|> add_option(:txnNumber, {:long, seq})
232211
|> add_option(:startTransaction, true)
233212
|> add_option(:autocommit, false)
234213
|> add_option(:writeConcern, write_concern)
235214
|> add_option(:readConcern, read_concern)
236215
|> set_read_concern(data.operation_time, data.causal_consistency)
237216

238-
{:next_state, :in_transaction, data, {:reply, from, {:ok, new_query}}}
217+
session = Map.put(session, :last_use, :erlang.monotonic_time())
218+
219+
{:next_state, :in_transaction, struct(data, session: session),
220+
{:reply, from, {:ok, new_query}}}
239221
end
240222

241223
def handle_event({:call, from}, {:add_session, query}, :in_transaction, data) do
242224
new_query =
243225
query
244226
|> Keyword.new()
245227
|> Keyword.merge(
246-
lsid: data.id,
247-
txnNumber: {:long, data.txn},
228+
lsid: %{id: data.session.id},
229+
txnNumber: {:long, data.session.txn},
248230
autocommit: false
249231
)
250232

233+
session = Map.put(data.session, :last_use, :erlang.monotonic_time())
234+
data = struct(data, session: session)
235+
251236
case Keyword.get(new_query, :read_preference, %{mode: :primary}) do
252237
%{mode: :primary} ->
253-
{:keep_state_and_data, {:reply, from, {:ok, new_query}}}
238+
{:keep_state, data, {:reply, from, {:ok, new_query}}}
254239

255240
%{mode: mode} ->
256-
{:keep_state_and_data,
241+
{:keep_state, data,
257242
{:reply, from,
258243
{:error,
259244
Mongo.Error.exception(message: "Read preference must be primary, not: #{mode}")}}}
@@ -267,7 +252,7 @@ defmodule Mongo.Session do
267252
new_query =
268253
query
269254
|> Keyword.new()
270-
|> add_option(:lsid, data.id)
255+
|> add_option(:lsid, data.session.id)
271256
|> set_read_concern(data.operation_time, data.causal_consistency)
272257

273258
{:next_state, :no_transaction, data, {:reply, from, {:ok, new_query}}}
@@ -303,8 +288,13 @@ defmodule Mongo.Session do
303288

304289
# Finish session by ending process (for further "closing" see `terminate/3`
305290
# handler.
306-
def handle_event({:call, from}, :end_session, _state, _data) do
307-
{:stop_and_reply, :normal, {:reply, from, :ok}}
291+
def handle_event({:call, from}, :end_session, state, %{session: session} = data) do
292+
_ =
293+
if state == :in_transaction do
294+
try_run_txn_command(data, :abortTransaction)
295+
end
296+
297+
{:stop_and_reply, :normal, [{:reply, from, {:ok, session}}]}
308298
end
309299

310300
def handle_event({:call, from}, {:advance_operation_time, timestamp}, _state, data) do
@@ -318,10 +308,6 @@ defmodule Mongo.Session do
318308
end
319309
end
320310

321-
def handle_event({:call, from}, {:set_options, opts}, _state, data) do
322-
{:keep_state, struct(data, opts), {:reply, from, :ok}}
323-
end
324-
325311
# If parent process died before session then stop process and handle aborting
326312
# sessions in `terminate/3` handler.
327313
def handle_event(:info, {:DOWN, ref, :process, _pid, _reason}, _state, %{ref: ref}) do
@@ -339,12 +325,13 @@ defmodule Mongo.Session do
339325
@impl :gen_statem
340326
# Abort all pending transactions if there any and end session itself.
341327
def terminate(_reason, state, %{pid: pid} = data) do
342-
if state == :in_transaction do
343-
_ = try_run_txn_command(data, :abortTransaction)
344-
end
328+
_ =
329+
if state == :in_transaction do
330+
try_run_txn_command(data, :abortTransaction)
331+
end
345332

346333
query = %{
347-
endSessions: [data.id]
334+
endSessions: [data.session.id]
348335
}
349336

350337
with {:ok, conn, _, _} <- Mongo.select_server(pid, :write, []),
@@ -380,9 +367,9 @@ defmodule Mongo.Session do
380367
query =
381368
[
382369
{command, 1},
383-
lsid: state.id,
370+
lsid: %{id: state.session.id},
384371
autocommit: false,
385-
txnNumber: {:long, state.txn.seq}
372+
txnNumber: {:long, state.session.txn}
386373
]
387374
|> add_option(:writeConcern, state.write_concern)
388375

0 commit comments

Comments
 (0)