Skip to content

Commit d439b62

Browse files
committed
Made all long-running requests asynchronous
1 parent 991b1b0 commit d439b62

File tree

1 file changed

+35
-15
lines changed

1 file changed

+35
-15
lines changed

src/salesforce_bulk_python/bulk.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import asyncio
66
from typing import List
77
from abc import ABC, abstractmethod
8-
import queue
8+
from datetime import datetime
99

1010
from requests.models import HTTPError
1111

@@ -110,19 +110,25 @@ def __init__(self,object:SalesforceObject,connection:BulkAPIConnection) -> None:
110110
self.object = object
111111
self.id = None
112112

113-
def status(self):
114-
req=requests.get(f"{self.connection.instance_url}/services/data/{self.connection.settings.api_version}/jobs/query/{self.id}",headers=self.connection.headers)
113+
async def status(self):
114+
loop = asyncio.get_event_loop()
115+
f3=loop.run_in_executor(None,lambda:requests.get(f"{self.connection.instance_url}/services/data/{self.connection.settings.api_version}/jobs/query/{self.id}",headers=self.connection.headers))
116+
req = await f3
115117
req.raise_for_status()
116118
return req.json()['state']
117119

118120

119121
async def start(self):
120122
print(f'Starting job for {self.object.name}')
121-
req=requests.post(
123+
124+
loop = asyncio.get_event_loop()
125+
f1=loop.run_in_executor(None,lambda: requests.post(
122126
f"{self.connection.instance_url}/services/data/{self.connection.settings.api_version}/jobs/query",
123127
data=json.dumps(self.body),
124128
headers=self.connection.headers
125-
)
129+
))
130+
req = await f1
131+
126132
try:
127133
req.raise_for_status()
128134
except HTTPError as e:
@@ -136,11 +142,21 @@ async def start(self):
136142
raise
137143

138144
self.id = req.json()['id']
145+
delay_iter = iter([1,1,10,30,60])
146+
139147
while True:
140-
await asyncio.sleep(1)
141-
status = self.status()
148+
try:
149+
delay = next(delay_iter)
150+
except StopIteration:
151+
delay = delay
152+
153+
await asyncio.sleep(delay)
154+
155+
status = await self.status()
156+
print(f'{self.object.name}: {status}')
157+
142158
if status=='JobComplete':
143-
self.on_complete(f"{self.connection.instance_url}/services/data/{self.connection.settings.api_version}/jobs/query/{self.id}/results",self)
159+
await self.on_complete(f"{self.connection.instance_url}/services/data/{self.connection.settings.api_version}/jobs/query/{self.id}/results",self)
144160
print(f'Finished job for {self.object.name}')
145161
break
146162
return 0
@@ -187,13 +203,18 @@ def __init__(self,result_url:str,job:BulkAPIJob) -> None:
187203
self.batch_number = 0
188204
self.job = job
189205

190-
def fetch(self):
191-
result = requests.get(f"{self.result_url}?maxRecords=10000",headers=self.job.connection.headers)
206+
async def fetch(self):
207+
loop = asyncio.get_event_loop()
208+
f1=loop.run_in_executor(None,lambda: requests.get(f"{self.result_url}?maxRecords=50000",headers=self.job.connection.headers))
209+
210+
self.datetime_start_fetch = datetime.now()
211+
result = await f1
192212
self.handle(result)
193213
while 'sforce-locator' in result.headers.keys():
194214
if (result.headers['sforce-locator']!='NA') & (result.headers['sforce-locator']!='null'):
195215
self.batch_number += 1
196-
result=requests.get(f"{self.result_url}?locator={result.headers['sforce-locator']}&maxRecords=10000",headers=self.job.connection.headers)
216+
f2=loop.run_in_executor(None,lambda: requests.get(f"{self.result_url}?locator={result.headers['sforce-locator']}&maxRecords=50000",headers=self.job.connection.headers))
217+
result=await f2
197218
self.handle(result)
198219
else:
199220
break
@@ -205,10 +226,10 @@ def handle(self,data):
205226

206227
class JobCompleteEvent(List[BulkAPIResultHandler]):
207228

208-
def __call__(self, *args, **kwargs):
229+
async def __call__(self, *args, **kwargs):
209230
for c in self:
210231
i=c(*args,*kwargs)
211-
i.fetch()
232+
await i.fetch()
212233

213234
def __repr__(self):
214235
return "Event(%s)" % list.__repr__(self)
@@ -230,5 +251,4 @@ async def run_all(self):
230251
)
231252
else:
232253
break
233-
234-
254+

0 commit comments

Comments
 (0)