Skip to content

Commit c36c2a1

Browse files
committed
fix(core): Adjust connection timeout and retry logic
Do not use the session timeout as connection timeout. This value is too large and results in a bad non-responsive server holding up the client long enough for the session to timeout. Use the KazooRetry object to use an increasing backoff timeout and cycle over all servers quickly, working around bad servers with minimal impact.
1 parent 5225b3e commit c36c2a1

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed

kazoo/protocol/connection.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,9 @@ def _connect_attempt(self, host, hostip, port, retry):
612612

613613
try:
614614
self._xid = 0
615-
read_timeout, connect_timeout = self._connect(host, hostip, port)
615+
read_timeout, connect_timeout = self._connect(
616+
host, hostip, port, timeout=retry.cur_delay
617+
)
616618
read_timeout = read_timeout / 1000.0
617619
connect_timeout = connect_timeout / 1000.0
618620
retry.reset()
@@ -683,7 +685,7 @@ def _connect_attempt(self, host, hostip, port, retry):
683685
if self._socket is not None:
684686
self._socket.close()
685687

686-
def _connect(self, host, hostip, port):
688+
def _connect(self, host, hostip, port, timeout):
687689
client = self.client
688690
self.logger.info(
689691
"Connecting to %s(%s):%s, use_ssl: %r",
@@ -703,7 +705,7 @@ def _connect(self, host, hostip, port):
703705
with self._socket_error_handling():
704706
self._socket = self.handler.create_connection(
705707
address=(hostip, port),
706-
timeout=client._session_timeout / 1000.0,
708+
timeout=timeout,
707709
use_ssl=self.client.use_ssl,
708710
keyfile=self.client.keyfile,
709711
certfile=self.client.certfile,

kazoo/retry.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ def copy(self):
109109
obj.retry_exceptions = self.retry_exceptions
110110
return obj
111111

112+
@property
113+
def cur_delay(self):
114+
return self._cur_delay
115+
112116
def __call__(self, func, *args, **kwargs):
113117
"""Call a function with arguments until it completes without
114118
throwing a Kazoo exception

kazoo/tests/test_connection.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from unittest import mock
2+
3+
import pytest
4+
5+
from kazoo import retry
6+
from kazoo.handlers import threading
7+
from kazoo.protocol import connection
8+
from kazoo.protocol import states
9+
10+
11+
@mock.patch("kazoo.protocol.connection.ConnectionHandler._expand_client_hosts")
12+
def test_retry_logic(mock_expand):
13+
mock_client = mock.Mock()
14+
mock_client._state = states.KeeperState.CLOSED
15+
mock_client._session_id = None
16+
mock_client._session_passwd = b"\x00" * 16
17+
mock_client._stopped.is_set.return_value = False
18+
mock_client.handler.timeout_exception = threading.KazooTimeoutError
19+
mock_client.handler.create_connection.side_effect = (
20+
threading.KazooTimeoutError()
21+
)
22+
test_retry = retry.KazooRetry(
23+
max_tries=6,
24+
delay=1.0,
25+
backoff=2,
26+
max_delay=30.0,
27+
max_jitter=0.0,
28+
sleep_func=lambda _x: None,
29+
)
30+
test_cnx = connection.ConnectionHandler(
31+
client=mock_client,
32+
retry_sleeper=test_retry,
33+
)
34+
mock_expand.return_value = [
35+
("a", "1.1.1.1", 2181),
36+
("b", "2.2.2.2", 2181),
37+
("c", "3.3.3.3", 2181),
38+
]
39+
40+
with pytest.raises(retry.RetryFailedError):
41+
test_retry(test_cnx._connect_loop, test_retry)
42+
43+
assert mock_client.handler.create_connection.call_args_list[:3] == [
44+
mock.call(
45+
address=("1.1.1.1", 2181),
46+
timeout=1.0,
47+
use_ssl=mock.ANY,
48+
keyfile=mock.ANY,
49+
certfile=mock.ANY,
50+
ca=mock.ANY,
51+
keyfile_password=mock.ANY,
52+
verify_certs=mock.ANY,
53+
),
54+
mock.call(
55+
address=("2.2.2.2", 2181),
56+
timeout=1.0,
57+
use_ssl=mock.ANY,
58+
keyfile=mock.ANY,
59+
certfile=mock.ANY,
60+
ca=mock.ANY,
61+
keyfile_password=mock.ANY,
62+
verify_certs=mock.ANY,
63+
),
64+
mock.call(
65+
address=("3.3.3.3", 2181),
66+
timeout=1.0,
67+
use_ssl=mock.ANY,
68+
keyfile=mock.ANY,
69+
certfile=mock.ANY,
70+
ca=mock.ANY,
71+
keyfile_password=mock.ANY,
72+
verify_certs=mock.ANY,
73+
),
74+
], "All hosts are first tried with the lowest timeout value"
75+
assert mock_client.handler.create_connection.call_args_list[-3:] == [
76+
mock.call(
77+
address=("1.1.1.1", 2181),
78+
timeout=30.0,
79+
use_ssl=mock.ANY,
80+
keyfile=mock.ANY,
81+
certfile=mock.ANY,
82+
ca=mock.ANY,
83+
keyfile_password=mock.ANY,
84+
verify_certs=mock.ANY,
85+
),
86+
mock.call(
87+
address=("2.2.2.2", 2181),
88+
timeout=30.0,
89+
use_ssl=mock.ANY,
90+
keyfile=mock.ANY,
91+
certfile=mock.ANY,
92+
ca=mock.ANY,
93+
keyfile_password=mock.ANY,
94+
verify_certs=mock.ANY,
95+
),
96+
mock.call(
97+
address=("3.3.3.3", 2181),
98+
timeout=30.0,
99+
use_ssl=mock.ANY,
100+
keyfile=mock.ANY,
101+
certfile=mock.ANY,
102+
ca=mock.ANY,
103+
keyfile_password=mock.ANY,
104+
verify_certs=mock.ANY,
105+
),
106+
], "All hosts are last tried with the highest timeout value"

0 commit comments

Comments
 (0)