Skip to content

Commit cb9afce

Browse files
committed
process: Refactor process transport to handle late stdio init
1 parent 8f28da2 commit cb9afce

File tree

9 files changed

+267
-50
lines changed

9 files changed

+267
-50
lines changed

tests/test_process.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,92 @@ def cancel_make_transport():
464464

465465

466466
class Test_UV_Process(_TestProcess, tb.UVTestCase):
467+
468+
def test_process_lated_stdio_init(self):
469+
470+
class TestProto:
471+
def __init__(self):
472+
self.lost = 0
473+
self.stages = []
474+
475+
def connection_made(self, transport):
476+
self.stages.append(('CM', transport))
477+
478+
def pipe_data_received(self, fd, data):
479+
if fd == 1:
480+
self.stages.append(('STDOUT', data))
481+
482+
def pipe_connection_lost(self, fd, exc):
483+
if fd == 1:
484+
self.stages.append(('STDOUT', 'LOST'))
485+
486+
def process_exited(self):
487+
self.stages.append('PROC_EXIT')
488+
489+
def connection_lost(self, exc):
490+
self.stages.append(('CL', self.lost, exc))
491+
self.lost += 1
492+
493+
async def run(**kwargs):
494+
return await self.loop.subprocess_shell(
495+
lambda: TestProto(),
496+
'echo 1',
497+
**kwargs)
498+
499+
with self.subTest('paused, stdin pipe'):
500+
transport, proto = self.loop.run_until_complete(
501+
run(stdin=subprocess.PIPE,
502+
stdout=subprocess.PIPE,
503+
stderr=subprocess.PIPE,
504+
__uvloop_sleep_after_fork=True))
505+
self.assertIsNot(transport, None)
506+
self.assertEqual(transport.get_returncode(), 0)
507+
self.assertEqual(
508+
set(proto.stages),
509+
{
510+
('CM', transport),
511+
'PROC_EXIT',
512+
('STDOUT', b'1\n'),
513+
('STDOUT', 'LOST'),
514+
('CL', 0, None)
515+
})
516+
517+
with self.subTest('paused, no stdin'):
518+
transport, proto = self.loop.run_until_complete(
519+
run(stdin=None,
520+
stdout=subprocess.PIPE,
521+
stderr=subprocess.PIPE,
522+
__uvloop_sleep_after_fork=True))
523+
self.assertIsNot(transport, None)
524+
self.assertEqual(transport.get_returncode(), 0)
525+
self.assertEqual(
526+
set(proto.stages),
527+
{
528+
('CM', transport),
529+
'PROC_EXIT',
530+
('STDOUT', b'1\n'),
531+
('STDOUT', 'LOST'),
532+
('CL', 0, None)
533+
})
534+
535+
with self.subTest('no pause, no stdin'):
536+
transport, proto = self.loop.run_until_complete(
537+
run(stdin=None,
538+
stdout=subprocess.PIPE,
539+
stderr=subprocess.PIPE))
540+
self.loop.run_until_complete(transport._wait())
541+
self.assertEqual(transport.get_returncode(), 0)
542+
self.assertIsNot(transport, None)
543+
self.assertEqual(
544+
set(proto.stages),
545+
{
546+
('CM', transport),
547+
'PROC_EXIT',
548+
('STDOUT', b'1\n'),
549+
('STDOUT', 'LOST'),
550+
('CL', 0, None)
551+
})
552+
467553
def test_process_streams_redirect(self):
468554
# This won't work for asyncio implementation of subprocess
469555

uvloop/handles/pipe.pyx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ cdef class UVWritePipeTransport(UVWriteTransport):
111111
object waiter):
112112
cdef UVWritePipeTransport handle
113113
handle = UVWritePipeTransport.__new__(UVWritePipeTransport)
114+
115+
# We listen for read events on write-end of the pipe. When
116+
# the read-end is close, the uv_stream_t.read callback will
117+
# receive an error -- we want to silence that error, and just
118+
# close the transport.
119+
handle._close_on_read_error()
120+
114121
handle._init(loop, protocol, server, waiter)
115122
__pipe_init_uv_handle(<UVStream>handle, loop)
116123
return handle

uvloop/handles/process.pxd

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ cdef class UVProcess(UVHandle):
1717

1818
cdef _init(self, Loop loop, list args, dict env, cwd,
1919
start_new_session,
20-
stdin, stdout, stderr, pass_fds)
20+
stdin, stdout, stderr, pass_fds,
21+
debug_flags)
2122

2223
cdef char** __to_cstring_array(self, list arr)
2324
cdef _init_args(self, list args)
@@ -35,24 +36,35 @@ cdef class UVProcess(UVHandle):
3536
cdef class UVProcessTransport(UVProcess):
3637
cdef:
3738
list _exit_waiters
39+
list _init_futs
40+
bint _stdio_ready
41+
list _pending_calls
3842
object _protocol
43+
bint _finished
3944

4045
UVWritePipeTransport stdin
4146
UVReadPipeTransport stdout
4247
UVReadPipeTransport stderr
4348

49+
object stdin_proto
50+
object stdout_proto
51+
object stderr_proto
52+
4453
cdef _file_redirect_stdio(self, int fd)
4554
cdef _file_devnull(self)
55+
cdef _file_inpipe(self)
4656
cdef _file_outpipe(self)
4757

4858
cdef _check_proc(self)
4959
cdef _pipe_connection_lost(self, int fd, exc)
5060
cdef _pipe_data_received(self, int fd, data)
5161

5262
cdef _call_connection_made(self, waiter)
63+
cdef _try_finish(self)
5364

5465
@staticmethod
5566
cdef UVProcessTransport new(Loop loop, protocol, args, env, cwd,
5667
start_new_session,
5768
stdin, stdout, stderr, pass_fds,
58-
waiter)
69+
waiter,
70+
debug_flags)

0 commit comments

Comments
 (0)