Skip to content

Commit dd62e38

Browse files
committed
Refactor: request hash to peer in a more efficient way
1 parent 93c89f3 commit dd62e38

File tree

1 file changed

+62
-27
lines changed

1 file changed

+62
-27
lines changed

src/aleph/services/p2p/http.py

Lines changed: 62 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,14 @@
1212

1313
LOGGER = logging.getLogger("P2P.HTTP")
1414

15-
SESSIONS = dict()
16-
17-
18-
async def api_get_request(base_uri, method, timeout=1):
19-
if timeout not in SESSIONS:
20-
connector = aiohttp.TCPConnector(limit_per_host=5)
21-
SESSIONS[timeout] = aiohttp.ClientSession(
22-
read_timeout=timeout, connector=connector
23-
)
2415

16+
async def api_get_request(session: aiohttp.ClientSession, base_uri, method, timeout=1):
2517
uri = f"{base_uri}/api/v0/{method}"
2618
try:
27-
async with SESSIONS[timeout].get(uri) as resp:
19+
async with session.get(uri) as resp:
2820
if resp.status != 200:
29-
result = None
30-
else:
31-
result = await resp.json()
21+
return None
22+
return await resp.json()
3223
except (
3324
TimeoutError,
3425
asyncio.TimeoutError,
@@ -44,27 +35,71 @@ async def api_get_request(base_uri, method, timeout=1):
4435

4536

4637
async def get_peer_hash_content(
47-
base_uri: str, item_hash: str, timeout: int = 1
38+
session: aiohttp.ClientSession,
39+
base_uri: str,
40+
item_hash: str,
41+
semaphore,
42+
timeout: int = 1,
4843
) -> Optional[bytes]:
49-
result = None
50-
item = await api_get_request(base_uri, f"storage/{item_hash}", timeout=timeout)
51-
if item is not None and item["status"] == "success" and item["content"] is not None:
52-
# TODO: IMPORTANT /!\ verify the hash of received data!
53-
return base64.decodebytes(item["content"].encode("utf-8"))
54-
else:
55-
LOGGER.debug(f"can't get hash {item_hash}")
56-
57-
return result
44+
async with (
45+
semaphore
46+
): # We use semaphore to avoid having too much call at the same time
47+
result = None
48+
item = await api_get_request(
49+
session=session,
50+
base_uri=base_uri,
51+
method=f"storage/{item_hash}",
52+
timeout=timeout,
53+
)
54+
if (
55+
item is not None
56+
and item["status"] == "success"
57+
and item["content"] is not None
58+
):
59+
# TODO: IMPORTANT /!\ verify the hash of received data!
60+
return base64.decodebytes(item["content"].encode("utf-8"))
61+
else:
62+
LOGGER.debug(f"can't get hash {item_hash}")
63+
return result
5864

5965

6066
async def request_hash(
6167
api_servers: Sequence[str], item_hash: str, timeout: int = 1
6268
) -> Optional[bytes]:
6369
uris: List[str] = sample(api_servers, k=len(api_servers))
6470

65-
for uri in uris:
66-
content = await get_peer_hash_content(uri, item_hash, timeout=timeout)
67-
if content is not None:
68-
return content
71+
# Avoid too much request at the same time
72+
semaphore = asyncio.Semaphore(5)
73+
74+
connector = aiohttp.TCPConnector(limit_per_host=5)
75+
timeout_conf = aiohttp.ClientTimeout(total=timeout)
76+
77+
async with aiohttp.ClientSession(
78+
connector=connector, timeout=timeout_conf
79+
) as session:
80+
# Use Task instead of
81+
tasks = [
82+
asyncio.create_task(
83+
get_peer_hash_content(
84+
session=session,
85+
base_uri=url,
86+
item_hash=item_hash,
87+
semaphore=semaphore,
88+
)
89+
)
90+
for url in uris
91+
]
92+
93+
for completed_task in asyncio.as_completed(tasks):
94+
try:
95+
result = await completed_task
96+
if result:
97+
# We cancel other Task
98+
for task in tasks:
99+
if not task.done():
100+
task.cancel()
101+
return result
102+
except Exception:
103+
continue
69104

70105
return None # Nothing found...

0 commit comments

Comments
 (0)