深入理解子进程
原文链接 http://www.kkblog.me/notes/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3%E5%AD%90%E8%BF%9B%E7%A8%8B
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。
很多时候,我需要写脚本去做一些自动化操作,简单的可以直接写 Shell 脚本,但一些稍复杂的情况, 比如要用到分支语句,循环语句,或者调用一些高级函数,用 Shell 就太费劲了。 我更喜欢用一种完整的语言(比如 Python),调用 Shell 程序并获取它的输出,执行复杂操作。
本文介绍 UNIX 进程的创建过程(fork, exec),如何与子进程通信(pipe, pty), 并深入分析标准库中的 subprocess 模块和著名的第三方库 sh 的源码,阐述其实现原理。
这些库提供了非常易用的接口,大部分时候只要看看文档就能解决问题。 但它们也无法彻底掩盖实现细节,The Law of Leaky Abstractions - 抽象漏洞法则 解释了许多不完美的抽象,希望这篇文章也能帮你更好地理解子进程。
UNIX进程的创建过程
fork
fork 函数以复制调用进程的方式创建新进程,它 调用一次,返回两次,在子进程中它返回 0
,
在父进程中它返回子进程的ID。
import os
def test_fork():
pid = os.fork()
if pid == 0:
print('child')
os._exit(0)
else:
print('parent')
test_fork()
fork 之后父子进程各自继续往下执行,所以示例中的两个 print
都会执行,
子进程通过 os._exit
退出,否则两个进程共用标准输入,shell 就不能正常工作了。
fork
是个系统调用,可通过 man fork.2
查看其手册。
_exit
也是个系统调用,它与 sys.exit
的区别在于:_exit
直接退出,
而 sys.exit
先执行一些清理工作再退出。可通过 man exit.2
和 man exit.3
查看其手册。
exec
exec 是一系列函数,总共有七个,它把当前进程执行的程序替换成新程序,正常情况下它们 调用一次,永不退出。
import os
os.execv('/bin/sh', ['sh'])
执行示例中的代码之后,进程就变成了 sh,无法再回到 python。
exec 函数中,通常 execve 是系统调用,其他几个都能通过它来实现,
可通过 man execve.2
和 man exec.3
查看其手册。
waitpid
通常 fork 之后,子进程调用 exec 执行新程序,父进程要等待子进程的结果, 这就可以通过 waitpid 实现。如果父进程没有调用 waitpid 获取子进程的退出状态, 那么子进程退出后,状态信息会一直保留在内存中,这样的子进程也被称为僵尸进程。
import os
def system(cmd):
pid = os.fork()
if pid == 0:
os.execv('/bin/sh', ['sh', '-c', cmd])
else:
return os.waitpid(pid, 0)
system('python --version')
这就是 os.system
的实现原理,子进程共用父进程的标准输入输出,父进程阻塞,直到子进程结束。
因为子进程的输出是直接送到标准输出,所以无法被父进程获取。
可通过 man waitpid.2
和 man system.3
查看其手册。
dup2
int dup2(int oldfd, int newfd);
dup2 可以复制文件描述符, 它会先把 newfd 关闭,再把 oldfd 复制到 newfd, 经常用它来修改进程的标准 I/O。
import os, sys
out = open('out.txt', 'w')
os.dup2(out.fileno(), sys.stdout.fileno())
print('这段文字会输出到out.txt中,而不是控制台')
可通过 man dup2.2
查看手册。
进程间通信
管道
进程之间有很多种通信方式,这里只讨论 管道 这种方式,这也是最常用的一种方式。 管道通常是半双工的,只能一端读,另一端写,为了可移植性,不能预先假定系统支持全双工管道。
import os
r, w = os.pipe()
os.write(w, b'hello')
print(os.read(r, 10))
可通过 man pipe.2
查看其手册。
缓冲I/O
I/O 可分为:无缓冲,行缓冲,全缓冲 三种。
通过 read 和 write 系统调用直接读写文件,就是无缓冲模式,性能也最差。 而通过标准 I/O 库读写文件,就是缓冲模式,标准I/O库提供缓冲的目的是尽可能减少 read 和 write 调用的次数,提高性能。
行缓冲模式,当在输入输出中遇到换行符时,才进行实际 I/O 操作。
全缓冲模式,当填满缓冲区时,才进行实际 I/O 操作。
管道和普通文件默认是全缓冲的,标准输入和标准输出默认是行缓冲的,标准错误默认是无缓冲的。
import os
r, w = os.pipe()
fw = os.fdopen(w, 'wb')
fr = os.fdopen(r, 'rb')
fw.write(b'hello')
print(fr.read())
这个例子中,读管道这步会一直阻塞。有两个原因:
- 写管道有缓冲,没有进行实际 I/O,所以读端读不到数据
- 读管道也有缓冲,要读满缓冲区才会返回
只要满足其中任何一个条件都会阻塞。通常写管道是在子进程中进行,我们无法控制其缓冲, 这也是管道的一个局限性。
伪终端
伪终端看起来就像一个双向管道,一端称为 master(主),另一端称为 slave(从)。 从端看上去和真实的终端一样,能够处理所有的终端 I/O 函数。
import os
master, slave = os.openpty()
os.write(master, b'hello from master\n')
print(os.read(slave, 50))
# b'hello from master\n'
os.write(slave, b'hello from slave\n')
print(os.read(master, 50))
# b'hello from master\r\nhello from slave\r\n'
伪终端 echo 默认是开启的,所以最后 master 读的时候,会先读出之前写入的数据。
如果写入的数据没有换行符,就可能不会被传送到另一端, 造成读端一直阻塞(猜测是伪终端的底层实现使用了行缓冲)。
可通过 man openpty.3
和 man pty.7
查看其手册。
subprocess 的实现
subprocess 提供了很多接口, 其核心是 Popen(Process Open),其他部分都通过它来实现。subprocess 也支持 Windows 平台, 这里只分析它在 Unix 平台的实现。
subprocess源码: https://github.com/python/cpython/blob/3.6/Lib/subprocess.py#L540
首先看一下接口原型(L586),参数非常多:
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
shell=False, cwd=None, env=None, universal_newlines=False,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=(), *, encoding=None, errors=None):
"""Create new Popen instance."""
然后看到中间(L648)一大段注释:
# Input and output objects. The general principle is like
# this:
#
# Parent Child
# ------ -----
# p2cwrite ---stdin---> p2cread
# c2pread <--stdout--- c2pwrite
# errread <--stderr--- errwrite
#
# On POSIX, the child objects are file descriptors. On
# Windows, these are Windows file handles. The parent objects
# are file descriptors on both platforms. The parent objects
# are -1 when not using PIPEs. The child objects are -1
# when not redirecting.
(p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
父子进程通过这三对文件描述符进行通信。
再跳到 _get_handles
的实现(L1144):
def _get_handles(self, stdin, stdout, stderr):
"""Construct and return tuple with IO objects:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
"""
p2cread, p2cwrite = -1, -1
c2pread, c2pwrite = -1, -1
errread, errwrite = -1, -1
if stdin is None:
pass
elif stdin == PIPE:
p2cread, p2cwrite = os.pipe()
elif stdin == DEVNULL:
p2cread = self._get_devnull()
elif isinstance(stdin, int):
p2cread = stdin
else:
# Assuming file-like object
p2cread = stdin.fileno()
它创建了三对文件描述符,stdin
,stdout
,stderr
这三个参数都是类似的,
分别对应一对文件描述符。当参数等于 PIPE
时,它就会创建一条管道,这也是最常用的参数。
回到 Popen,接着往下看(L684):
if p2cwrite != -1:
self.stdin = io.open(p2cwrite, 'wb', bufsize)
if text_mode:
self.stdin = io.TextIOWrapper(self.stdin, write_through=True,
line_buffering=(bufsize == 1),
encoding=encoding, errors=errors)
if c2pread != -1:
self.stdout = io.open(c2pread, 'rb', bufsize)
if text_mode:
self.stdout = io.TextIOWrapper(self.stdout,
encoding=encoding, errors=errors)
if errread != -1:
self.stderr = io.open(errread, 'rb', bufsize)
if text_mode:
self.stderr = io.TextIOWrapper(self.stderr,
encoding=encoding, errors=errors)
self._execute_child(args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals, start_new_session)
它把其中三个文件描述符变成了文件对象,用于在父进程中和子进程通信。
注意 bufsize
参数只作用于父进程中的文件对象,子进程中的缓冲是没法控制的。
encoding
和 errors
参数就不赘述了,可以查看 io 模块的文档。
再看 _execute_child
的实现(L1198):
def _execute_child(self, args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals, start_new_session):
"""Execute program (POSIX version)"""
if isinstance(args, (str, bytes)):
args = [args]
else:
args = list(args)
if shell:
args = ["/bin/sh", "-c"] + args
if executable:
args[0] = executable
看到这里,args
, shell
, executable
这几个参数就很好理解了。
args
可以是字符串或列表,shell
表示通过 shell 程序执行命令,
executable
是 shell 程序的路径,默认是 /bin/sh
。
当 args
是字符串时,通常配合 shell=True
使用,用来执行任意 shell 命令。
接着往下看(L1221):
# For transferring possible exec failure from child to parent.
# Data format: "exception name:hex errno:description"
# Pickle is not used; it is complex and involves memory allocation.
errpipe_read, errpipe_write = os.pipe()
这是第四对文件描述符。子进程在 fork 之后到 exec 之前会执行许多步骤, 万一这些步骤失败了,就通过这对文件描述向父进程传递异常信息,父进程收到后抛出相应的异常。
创建子进程并执行命令(L1252):
fds_to_keep = set(pass_fds)
fds_to_keep.add(errpipe_write)
self.pid = _posixsubprocess.fork_exec(
args, executable_list,
close_fds, sorted(fds_to_keep), cwd, env_list,
p2cread, p2cwrite, c2pread, c2pwrite,
errread, errwrite,
errpipe_read, errpipe_write,
restore_signals, start_new_session, preexec_fn)
self._child_created = True
继续追踪 _posixsubprocess
源码:
https://github.com/python/cpython/blob/3.6/Modules/_posixsubprocess.c#L545
static PyObject *
subprocess_fork_exec(PyObject* self, PyObject *args){
// ...省略一堆处理参数的代码
pid = fork();
if (pid == 0) {
// L692
child_exec(exec_array, argv, envp, cwd,
p2cread, p2cwrite, c2pread, c2pwrite,
errread, errwrite, errpipe_read, errpipe_write,
close_fds, restore_signals, call_setsid,
py_fds_to_keep, preexec_fn, preexec_fn_args_tuple);
}
// L733
return PyLong_FromPid(pid);
在子进程中执行 child_exec,父进程返回 pid。
继续看 child_exec 的实现(L390):
static void
child_exec(/*一堆参数*/)
{
int i, saved_errno, reached_preexec = 0;
PyObject *result;
const char* err_msg = "";
/* Buffer large enough to hold a hex integer. We can't malloc. */
char hex_errno[sizeof(saved_errno)*2+1];
if (make_inheritable(py_fds_to_keep, errpipe_write) < 0)
goto error;
make_inheritable
把 Popen 中 pass_fds
参数指定的文件描述符设为可继承,
它通过对每个文件描述符调用 _Py_set_inheritable
,清除 close-on-exec
标志。
需要注意,父进程的标准 I/O 默认是可继承的。
参考 PEP 446 -- Make newly created file descriptors non-inheritable
int flags, res;
flags = fcntl(fd, F_GETFD);
if (flags == -1) { /* handle the error */ }
flags |= FD_CLOEXEC;
/* or "flags &= ~FD_CLOEXEC;" to clear the flag */
res = fcntl(fd, F_SETFD, flags);
if (res == -1) { /* handle the error */ }
close-on-exec
(FD_CLOEXEC
) 标志表示这个文件会在进程执行 exec 系统调用时自动关闭。
接着往下看(L430):
/* Dup fds for child.
dup2() removes the CLOEXEC flag but we must do it ourselves if dup2()
would be a no-op (issue #10806). */
if (p2cread == 0) {
if (_Py_set_inheritable(p2cread, 1, NULL) < 0)
goto error;
}
else if (p2cread != -1)
POSIX_CALL(dup2(p2cread, 0)); /* stdin */
通过 dup2 系统调用,把 p2cread 设为子进程的标准输入。标准输出和标准错误也是类似的。
接着往下看(L463):
if (cwd)
POSIX_CALL(chdir(cwd));
if (restore_signals)
_Py_RestoreSignals();
#ifdef HAVE_SETSID
if (call_setsid)
POSIX_CALL(setsid());
#endif
cwd
参数,设置当前工作目录。
restore_signals
参数,把信号处理恢复为默认值,涉及信号处理的内容,这里略过。
call_setsid
,即 Popen 的 start_new_session
参数,
创建会话并设置进程组 ID,内容太多也略过。
接着往下看(L474):
reached_preexec = 1;
if (preexec_fn != Py_None && preexec_fn_args_tuple) {
/* This is where the user has asked us to deadlock their program. */
result = PyObject_Call(preexec_fn, preexec_fn_args_tuple, NULL);
if (result == NULL) {
/* Stringifying the exception or traceback would involve
* memory allocation and thus potential for deadlock.
* We've already faced potential deadlock by calling back
* into Python in the first place, so it probably doesn't
* matter but we avoid it to minimize the possibility. */
err_msg = "Exception occurred in preexec_fn.";
errno = 0; /* We don't want to report an OSError. */
goto error;
}
/* Py_DECREF(result); - We're about to exec so why bother? */
}
/* close FDs after executing preexec_fn, which might open FDs */
if (close_fds) {
/* TODO HP-UX could use pstat_getproc() if anyone cares about it. */
_close_open_fds(3, py_fds_to_keep);
}
执行 preexec_fn
,随后根据 close_fds
参数判断是否关闭打开的文件描述符。
最后,执行命令(L497):
/* This loop matches the Lib/os.py _execvpe()'s PATH search when */
/* given the executable_list generated by Lib/subprocess.py. */
saved_errno = 0;
for (i = 0; exec_array[i] != NULL; ++i) {
const char *executable = exec_array[i];
if (envp) {
execve(executable, argv, envp);
} else {
execv(executable, argv);
}
if (errno != ENOENT && errno != ENOTDIR && saved_errno == 0) {
saved_errno = errno;
}
}
尝试执行命令,只要有一个成功,后面的就不会执行了,因为 exec 执行成功后永不返回。
Popen 的创建过程到这就结束了,子进程已经运行起来了,接下来分析如何与子进程通信。
跳到 Popen 中的 communicate 方法(L796):
def communicate(self, input=None, timeout=None):
if self._communication_started and input:
raise ValueError("Cannot send input after starting communication")
这个 raise ValueError
是为什么呢?稍后解答。
继续往下看(L813):
# Optimization: If we are not worried about timeouts, we haven't
# started communicating, and we have one or zero pipes, using select()
# or threads is unnecessary.
if (timeout is None and not self._communication_started and
[self.stdin, self.stdout, self.stderr].count(None) >= 2):
stdout = None
stderr = None
if self.stdin:
self._stdin_write(input)
elif self.stdout:
stdout = self.stdout.read()
self.stdout.close()
elif self.stderr:
stderr = self.stderr.read()
self.stderr.close()
self.wait()
else:
if timeout is not None:
endtime = _time() + timeout
else:
endtime = None
try:
stdout, stderr = self._communicate(input, endtime, timeout)
finally:
self._communication_started = True
sts = self.wait(timeout=self._remaining_time(endtime))
return (stdout, stderr)
if
部分直接和子进程通信,else
部分调用了 self._communicate
,
用线程和 I/O 多路复用的方式与子进程通信,去掉影响也不大,这部分细节就不分析了。
注意 self._stdin_write(input)
很关键,在 self._communicate
中也是调用它向子进程写数据。
看 _stdin_write
的实现(L773):
def _stdin_write(self, input):
# L776
self.stdin.write(input)
# L787
self.stdin.close()
省略了异常处理的代码,主要就这两句。可以看到写完输入之后,立即把标准输入关闭了。
结合上面的 raise ValueError
,可以看出只能向子进程写入一次。
这涉及到缓冲的问题,管道是默认全缓冲的,如果不立即关闭写端,子进程读的时候就可能一直阻塞, 我们没法控制子进程使用什么类型的缓冲。同样的,如果子进程一直写输出,父进程也会读不到数据, 只有子进程退出之后,系统自动关闭它的标准输出,父进程才能读到数据。
下面是个例子:
# hello.py
from time import sleep
while True:
print('hello world')
sleep(1)
# test_hello.py
from subprocess import *
p = Popen(['python', 'hello.py'], stdin=PIPE, stdout=PIPE)
out, err = p.communicate()
print(out)
运行 python test_hello.py
,尽管子进程在不停地输出,但是父进程一直读取不到数据。
此时如果我们把子进程杀死,父进程便会立即读到数据。
subprocess 的核心代码到这就分析完了。
再放个自己实现的 popen(未考虑各种异常情况,仅用于说明实现原理):
# popen.py
import os
class popen:
def __init__(self, cmd, *, cwd=None, env=None, input=None):
argv = ['/bin/sh', '-c', cmd]
# Parent Child
# ------ -----
# p2cwrite ---stdin---> p2cread
# c2pread <--stdout--- c2pwrite
# errread <--stderr--- errwrite
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
errread, errwrite = os.pipe()
pid = os.fork()
if pid == 0:
os.close(p2cwrite)
os.close(c2pread)
os.close(errread)
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(errwrite, 2)
if cwd:
os.chdir(cwd)
if env:
os.execve('/bin/sh', argv, env)
else:
os.execv('/bin/sh', argv)
else:
os.close(p2cread)
os.close(c2pwrite)
os.close(errwrite)
stdin = open(p2cwrite, 'w')
stdout = open(c2pread, 'r')
stderr = open(errread, 'r')
# communicate
if input:
stdin.write(input)
stdin.close()
self.out = stdout.read()
stdout.close()
self.err = stderr.read()
stderr.read()
self.status = os.waitpid(pid, 0)[1]
def __repr__(self):
fmt = ('STATUS:{}\n' +
'OUTPUT'.center(80, '-') + '\n{}\n' +
'ERROR'.center(80, '-') + '\n{}\n')
return fmt.format(self.status, self.out, self.err)
执行命令试一试:
>>> from popen import popen
>>> popen('python -c "import this"')
STATUS:0
-------------------------------------OUTPUT-------------------------------------
The Zen of Python, by Tim Peters
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
-------------------------------------ERROR--------------------------------------
>>> popen('python -c "import that"')
STATUS:256
-------------------------------------OUTPUT-------------------------------------
-------------------------------------ERROR--------------------------------------
Traceback (most recent call last):
File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'that'
>>> popen('python --version').out
'Python 3.6.1\n'
>>>
sh 的实现
sh 为 Shell 操作提供了一个非常好用的接口,功能也非常强大。
from sh import ls
print(ls('-l'))
源码:https://github.com/amoffat/sh/blob/1.12.14/sh.py
代码总共 3500 多行,这里只介绍它实现中的关键部分,有兴趣可以详细阅读它的源码。 读代码之前最好先看一下它的文档 Reference,了解各个对象间的逻辑关系以及代码执行流程,避免陷入细节中。
从 sh 模块导入任意命令
我们可以从 sh 模块导入任意命令,而且这些命令都是动态生成的。
下面解析它的实现:
# magicmodule.py
import sys
from types import ModuleType
class MagicModule(ModuleType):
def __init__(self, name):
super().__init__(name)
def __getattr__(self, name):
return name
sys.modules[__name__] = MagicModule(__name__)
当导入一个模块时,Python 的导入机制会创建一个模块对象存到 sys.modules[__name__]
中,而 from module import attr
就会获取该模块对象的属性。
把系统创建的模块对象替换成实现了 __getattr__
方法的模块对象,就能导入任意命令了。
>>> from magicmodule import anything
>>> anything
'anything'
>>>
sh 中相关代码在 L3333 SelfWrapper
类 和 L3581 修改 sys.modules
。
优雅的异常处理
sh 支持这样的用法:
from sh import ErrorReturnCode_12
from sh import SignalException_9
from sh import SignalException_SIGHUP
try:
some_cmd()
except ErrorReturnCode_12:
print("couldn't do X")
这些异常类都是动态生成的(L433):
def get_exc_from_name(name):
"""takes an exception name, like:
ErrorReturnCode_1
SignalException_9
SignalException_SIGHUP
and returns the corresponding exception. this is primarily used for
importing exceptions from sh into user code, for instance, to capture those
exceptions
"""
它用下面这个正则表达式(L424)匹配异常类的名称,取出返回码或者信号码:
rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_((\d+)|SIG[a-zA-Z]+)")
然后动态生成一个类:
exc = ErrorReturnCodeMeta(name, (base,), {"exit_code": rc})
ErrorReturnCodeMeta
(L325)是一个元类,动态生成的异常类是元类的实例。
which 命令
sh 用 Python 实现了 which
命令的功能(L522):
>>> sh.which('ls')
'/usr/bin/ls'
实现原理就是遍历 PATH
环境变量中所有的路径,找到符合要求的可执行文件:
# L528
def is_exe(fpath):
# 存在 & 可执行 & 是个文件
return (os.path.exists(fpath) and
os.access(fpath, os.X_OK) and
os.path.isfile(os.path.realpath(fpath)))
# L554
for path in paths_to_search:
exe_file = os.path.join(path, program)
if is_exe(exe_file):
found_path = exe_file
break
后面启动子进程需要用到这个函数。
def resolve_command_path(program):
# 查找可执行文件
path = which(program)
if not path:
# 替换下划线为横杠,再次尝试
if "_" in program:
path = which(program.replace("_", "-"))
if not path:
return None
return path
# 导入命令时调用此函数,创建Command对象
def resolve_command(name, baked_args=None):
path = resolve_command_path(name)
cmd = None
if path:
cmd = Command(path)
if baked_args:
cmd = cmd.bake(**baked_args)
return cmd
Command 类的实现
接下来先看 Command
类的实现(L1054),所有命令都是这个类的实例。
class Command(object):
""" represents an un-run system program, like "ls" or "cd". """
# L1065
_call_args = {
"fg": False, # run command in foreground
# run a command in the background. commands run in the background
# ignore SIGHUP and do not automatically exit when the parent process
# ends
"bg": False,
# ...一堆参数
}
# L1188
def __init__(self, path, search_paths=None):
found = which(path, search_paths)
# L1209
self._path = encode_to_py3bytes_or_py2str(found)
它把大量的参数定义放在 _call_args
中,这样有几个好处:
- 很方便处理大量参数
- 很方便写注释
- 可读性更好
因为要允许用户直接创建 Command 对象,所以又调用了一次 which
。
接着看它怎么处理参数的(L1236):
@staticmethod
def _extract_call_args(kwargs):
""" takes kwargs that were passed to a command's __call__ and extracts
out the special keyword arguments, we return a tuple of special keyword
args, and kwargs that will go to the execd command """
kwargs = kwargs.copy()
call_args = {}
for parg, default in Command._call_args.items():
key = "_" + parg
if key in kwargs:
call_args[parg] = kwargs[key]
del kwargs[key]
invalid_kwargs = special_kwarg_validator(call_args,
Command._kwarg_validators)
if invalid_kwargs:
exc_msg = []
for args, error_msg in invalid_kwargs:
exc_msg.append(" %r: %s" % (args, error_msg))
exc_msg = "\n".join(exc_msg)
raise TypeError("Invalid special arguments:\n\n%s\n" % exc_msg)
return call_args, kwargs
它将参数分成特殊参数(下划线开头)和普通参数,特殊参数能够控制命令的执行过程, 还能看到它对特殊参数进行了统一校验,出错提示也非常清晰。
Command 对象的 bake
方法,功能类似于 functools.partial
:
>>> from sh import ls
>>> lslh = ls.bake('-l', '-h')
>>> lslh()
total 56K
-rw-r--r-- 1 guyskk guyskk 155 May 12 13:00 aaa.json
-rw-r--r-- 1 guyskk guyskk 162 May 12 13:00 bbb.py
...
>>> ls
<Command '/usr/bin/ls'>
>>> lslh
<Command '/usr/bin/ls -l -h'>
>>>
bake
方法的实现(L1265):
def bake(self, *args, **kwargs):
fn = type(self)(self._path)
fn._partial = True
call_args, kwargs = self._extract_call_args(kwargs)
pruned_call_args = call_args
for k, v in Command._call_args.items():
try:
if pruned_call_args[k] == v:
del pruned_call_args[k]
except KeyError:
continue
fn._partial_call_args.update(self._partial_call_args)
fn._partial_call_args.update(pruned_call_args)
fn._partial_baked_args.extend(self._partial_baked_args)
sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])
prefix = pruned_call_args.get("long_prefix",
self._call_args["long_prefix"])
fn._partial_baked_args.extend(compile_args(args, kwargs, sep, prefix))
return fn
_partial_call_args
属性稍后会用到。
Command 是 callable 对象,它的 __call__
方法实现比较复杂(L1324):
def __call__(self, *args, **kwargs):
# ...中间的具体实现不太好理解,先看最后一行
return RunningCommand(cmd, call_args, stdin, stdout, stderr)
RunningCommand 是创建子进程执行命令,所以这里主要是处理参数和三个标准IO。
处理管道命令, 如果第一个参数是正在运行的命令,就复用它的标准输入:
# L1373
# check if we're piping via composition
stdin = call_args["in"]
if args:
first_arg = args.pop(0)
if isinstance(first_arg, RunningCommand):
if first_arg.call_args["piped"]:
stdin = first_arg.process
else:
stdin = first_arg.process._pipe_queue
else:
args.insert(0, first_arg)
处理 fg
(foreground) 参数:
if call_args["fg"]:
if call_args["env"] is None:
launch = lambda: os.spawnv(os.P_WAIT, cmd[0], cmd)
else:
launch = lambda: os.spawnve(os.P_WAIT, cmd[0], cmd, call_args["env"])
exit_code = launch()
os.spawn*
和 os.system
运行的效果差不多,区别在于它不需要通过 sh 进程执行命令。
out
参数(err
参数也差不多):
# stdout redirection
stdout = call_args["out"]
if output_redirect_is_filename(stdout):
stdout = open(str(stdout), "wb")
RunningCommand 的实现
先看接口(L649):
class RunningCommand(object):
"""this represents an executing Command object."""
def __init__(self, cmd, call_args, stdin, stdout, stderr):
"""
cmd is an array, where each element is encoded as bytes (PY3) or str
(PY2)
"""
其实和 Popen 的接口差不多,只是它把一堆参数放在 call_args
里面了。
其中有个 iter
参数允许迭代获取输出,而不是等子进程结束后再一次性获取。
# set up which stream should write to the pipe
# TODO, make pipe None by default and limit the size of the Queue
# in oproc.OProc
pipe = OProc.STDOUT
if call_args["iter"] == "out" or call_args["iter"] is True:
pipe = OProc.STDOUT
elif call_args["iter"] == "err":
pipe = OProc.STDERR
通过 OProc
创建进程执行命令(L750),之后等待进程结束:
if spawn_process:
# this lock is needed because of a race condition where a background
# thread, created in the OProc constructor, may try to access
# self.process, but it has not been assigned yet
process_assign_lock = threading.Lock()
with process_assign_lock:
self.process = OProc(self, self.log, cmd, stdin, stdout, stderr,
self.call_args, pipe, process_assign_lock)
if should_wait:
self.wait()
其实 RunningCommand
实现了 __str__
和 __repr__
方法,
所以它看上去像字符串,它也实现了 __iter__
方法,也就能迭代获取输出。
>>> ret = sh.ls('-l')
>>> type(ret)
<class 'sh.RunningCommand'>
>>> for line in ret:
... print(line)
total 28392
-rwxr-xr-x 1 guyskk guyskk 8464 Jul 1 18:34 a.out
-rw-r--r-- 1 guyskk guyskk 421 Jun 4 22:15 app.py
OProc 的实现
OProc(L1678)封装了创建进程以及进程通信的逻辑,绝大部分特殊参数都是在这处理的。 它的构造函数特别特别长,逻辑太多了。
特殊参数这么多,估计作者也很无奈:
# convenience
ca = self.call_args
这里主要看一下 伪终端 相关的参数:
_tty_in
Default value: False, meaning a os.pipe() will be used._tty_out
Default value: True
If True, sh creates a TTY for STDOUT, otherwise use a os.pipe().
子进程的输入默认是管道,输出默认是伪终端。 伪终端是行缓存模式,所以能不停地取到输出,对比一下前面用 Popen 运行 hello.py 的效果:
>>> proc = sh.python('hello.py', _iter='out')
>>> for line in proc:
... print(line)
...
hello world
hello world
hello world
sh 把 _tty_out
默认值设为 True 使得它在兼容性方面比 Popen 好很多,
Why is _tty_out=True the default?
大致看一下实现:
# L1770
elif ca["tty_in"]:
self._stdin_read_fd, self._stdin_write_fd = pty.openpty()
# tty_in=False is the default
else:
self._stdin_write_fd, self._stdin_read_fd = os.pipe()
# L1782
# tty_out=True is the default
elif ca["tty_out"]:
self._stdout_read_fd, self._stdout_write_fd = pty.openpty()
else:
self._stdout_read_fd, self._stdout_write_fd = os.pipe()
后面的 fork, exec 和 Popen 几乎一样,就不重复介绍了。
读后感
sh 的代码我看了大约一周才理清其中的逻辑,代码太复杂了。
大致有几个原因:
- Unix 进程本身的复杂性,概念和暗坑很多
- 用了一些不为人知的 Python 特性(黑魔法)
- 源码只有一个文件,3500+ 代码,代码结构不清晰, bottle 项目也是这样的问题
- 功能太强大了
源码中还有非常多细节我没有提到,其实很多我也不明白,所以只能大致地介绍几个要点, 梳理一下命令的执行过程,希望能有所帮助 ╮( ̄▽ ̄")╭