@@ -155,6 +155,16 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
155
155
return builder . ToResponse ( ) ;
156
156
}
157
157
158
+
159
+ private static void RegisterApmTaskTimeout ( IAsyncResult result , WebRequest request , RequestData requestData ) =>
160
+ ThreadPool . RegisterWaitForSingleObject ( result . AsyncWaitHandle , TimeoutCallback , request , requestData . RequestTimeout , true ) ;
161
+
162
+ private static void TimeoutCallback ( object state , bool timedOut )
163
+ {
164
+ if ( ! timedOut ) return ;
165
+ ( state as WebRequest ) ? . Abort ( ) ;
166
+ }
167
+
158
168
public virtual async Task < ElasticsearchResponse < TReturn > > RequestAsync < TReturn > ( RequestData requestData , CancellationToken cancellationToken ) where TReturn : class
159
169
{
160
170
var builder = new ResponseBuilder < TReturn > ( requestData , cancellationToken ) ;
@@ -165,7 +175,10 @@ public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(
165
175
166
176
if ( data != null )
167
177
{
168
- using ( var stream = await request . GetRequestStreamAsync ( ) . ConfigureAwait ( false ) )
178
+ var apmGetRequestStreamTask = Task . Factory . FromAsync ( request . BeginGetRequestStream , request . EndGetRequestStream , null ) ;
179
+ RegisterApmTaskTimeout ( apmGetRequestStreamTask , request , requestData ) ;
180
+
181
+ using ( var stream = await apmGetRequestStreamTask . ConfigureAwait ( false ) )
169
182
{
170
183
if ( requestData . HttpCompression )
171
184
using ( var zipStream = new GZipStream ( stream , CompressionMode . Compress ) )
@@ -180,7 +193,10 @@ public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(
180
193
//Either the stream or the response object needs to be closed but not both although it won't
181
194
//throw any errors if both are closed atleast one of them has to be Closed.
182
195
//Since we expose the stream we let closing the stream determining when to close the connection
183
- var response = ( HttpWebResponse ) ( await request . GetResponseAsync ( ) . ConfigureAwait ( false ) ) ;
196
+
197
+ var apmGetResponseTask = Task . Factory . FromAsync ( request . BeginGetResponse , request . EndGetResponse , null ) ;
198
+ RegisterApmTaskTimeout ( apmGetResponseTask , request , requestData ) ;
199
+ var response = ( HttpWebResponse ) ( await apmGetResponseTask . ConfigureAwait ( false ) ) ;
184
200
builder . StatusCode = ( int ) response . StatusCode ;
185
201
builder . Stream = response . GetResponseStream ( ) ;
186
202
// https://github.com/elastic/elasticsearch-net/issues/2311
0 commit comments