Skip to content

Commit 33713f4

Browse files
authored
Fix executor blocking (#279)
1 parent 9110055 commit 33713f4

File tree

1 file changed

+25
-6
lines changed

1 file changed

+25
-6
lines changed

server/src/event.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,30 @@ fn commit_schema(
213213
.expect("map of schemas is serializable");
214214
// try to put to storage
215215
let storage = CONFIG.storage().get_object_store();
216-
let res = futures::executor::block_on(storage.put_schema_map(stream_name, &schema_map));
216+
217+
let _stream_name = stream_name.to_owned();
218+
let handle = std::thread::spawn(move || {
219+
let rt = actix_web::rt::System::new();
220+
rt.block_on(storage.put_schema_map(&_stream_name, &schema_map))
221+
});
222+
223+
let res = match handle.join() {
224+
Ok(res) => res.map_err(EventError::ObjectStorage),
225+
Err(_) => {
226+
log::error!("commit schema thread panicked");
227+
Err(EventError::InternalError)
228+
}
229+
};
217230
// revert if err
218-
if res.is_err() {
219-
stream_metadata.remove_unchecked(stream_name, schema_key)
231+
if let Err(ref err) = res {
232+
stream_metadata.remove_unchecked(stream_name, schema_key);
233+
log::error!(
234+
"Failed to commit schema during new event ingestion: {}",
235+
err
236+
)
220237
}
221-
// return result
222-
res.map_err(|err| err.into())
238+
239+
res
223240
}
224241
}
225242

@@ -288,8 +305,10 @@ pub mod error {
288305
Arrow(#[from] ArrowError),
289306
#[error("Schema Mismatch")]
290307
SchemaMismatch,
291-
#[error("Schema Mismatch: {0}")]
308+
#[error("ObjectStorage Error: {0}")]
292309
ObjectStorage(#[from] ObjectStorageError),
310+
#[error("Internal Error")]
311+
InternalError,
293312
}
294313
}
295314

0 commit comments

Comments
 (0)