深入理解子进程

2017-07-29 kk 更多博文 » 博客 » GitHub »

原文链接 http://www.kkblog.me/notes/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3%E5%AD%90%E8%BF%9B%E7%A8%8B
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。


很多时候,我需要写脚本去做一些自动化操作,简单的可以直接写 Shell 脚本,但一些稍复杂的情况, 比如要用到分支语句,循环语句,或者调用一些高级函数,用 Shell 就太费劲了。 我更喜欢用一种完整的语言(比如 Python),调用 Shell 程序并获取它的输出,执行复杂操作。

本文介绍 UNIX 进程的创建过程(fork, exec),如何与子进程通信(pipe, pty), 并深入分析标准库中的 subprocess 模块和著名的第三方库 sh 的源码,阐述其实现原理。

这些库提供了非常易用的接口,大部分时候只要看看文档就能解决问题。 但它们也无法彻底掩盖实现细节,The Law of Leaky Abstractions - 抽象漏洞法则 解释了许多不完美的抽象,希望这篇文章也能帮你更好地理解子进程。

UNIX进程的创建过程

fork

fork 函数以复制调用进程的方式创建新进程,它 调用一次,返回两次,在子进程中它返回 0, 在父进程中它返回子进程的ID。

import os

def test_fork():
    pid = os.fork()
    if pid == 0:
        print('child')
        os._exit(0)
    else:
        print('parent')

test_fork()

fork 之后父子进程各自继续往下执行,所以示例中的两个 print 都会执行, 子进程通过 os._exit 退出,否则两个进程共用标准输入,shell 就不能正常工作了。

fork 是个系统调用,可通过 man fork.2 查看其手册。

_exit 也是个系统调用,它与 sys.exit 的区别在于:_exit 直接退出, 而 sys.exit 先执行一些清理工作再退出。可通过 man exit.2man exit.3 查看其手册。

exec

exec 是一系列函数,总共有七个,它把当前进程执行的程序替换成新程序,正常情况下它们 调用一次,永不退出

import os
os.execv('/bin/sh', ['sh'])

执行示例中的代码之后,进程就变成了 sh,无法再回到 python。

exec 函数中,通常 execve 是系统调用,其他几个都能通过它来实现, 可通过 man execve.2man exec.3 查看其手册。

waitpid

通常 fork 之后,子进程调用 exec 执行新程序,父进程要等待子进程的结果, 这就可以通过 waitpid 实现。如果父进程没有调用 waitpid 获取子进程的退出状态, 那么子进程退出后,状态信息会一直保留在内存中,这样的子进程也被称为僵尸进程。

import os

def system(cmd):
    pid = os.fork()
    if pid == 0:
        os.execv('/bin/sh', ['sh', '-c', cmd])
    else:
        return os.waitpid(pid, 0)

system('python --version')

这就是 os.system 的实现原理,子进程共用父进程的标准输入输出,父进程阻塞,直到子进程结束。 因为子进程的输出是直接送到标准输出,所以无法被父进程获取。

可通过 man waitpid.2man system.3 查看其手册。

dup2

int dup2(int oldfd, int newfd);

dup2 可以复制文件描述符, 它会先把 newfd 关闭,再把 oldfd 复制到 newfd, 经常用它来修改进程的标准 I/O。

import os, sys

out = open('out.txt', 'w')
os.dup2(out.fileno(), sys.stdout.fileno())
print('这段文字会输出到out.txt中,而不是控制台')

可通过 man dup2.2 查看手册。

进程间通信

管道

进程之间有很多种通信方式,这里只讨论 管道 这种方式,这也是最常用的一种方式。 管道通常是半双工的,只能一端读,另一端写,为了可移植性,不能预先假定系统支持全双工管道。

import os

r, w = os.pipe()
os.write(w, b'hello')
print(os.read(r, 10))

可通过 man pipe.2 查看其手册。

缓冲I/O

I/O 可分为:无缓冲行缓冲全缓冲 三种。

通过 read 和 write 系统调用直接读写文件,就是无缓冲模式,性能也最差。 而通过标准 I/O 库读写文件,就是缓冲模式,标准I/O库提供缓冲的目的是尽可能减少 read 和 write 调用的次数,提高性能。

行缓冲模式,当在输入输出中遇到换行符时,才进行实际 I/O 操作。
全缓冲模式,当填满缓冲区时,才进行实际 I/O 操作。

管道和普通文件默认是全缓冲的,标准输入和标准输出默认是行缓冲的,标准错误默认是无缓冲的。

import os

r, w = os.pipe()
fw = os.fdopen(w, 'wb')
fr = os.fdopen(r, 'rb')
fw.write(b'hello')
print(fr.read())

这个例子中,读管道这步会一直阻塞。有两个原因:

  1. 写管道有缓冲,没有进行实际 I/O,所以读端读不到数据
  2. 读管道也有缓冲,要读满缓冲区才会返回

只要满足其中任何一个条件都会阻塞。通常写管道是在子进程中进行,我们无法控制其缓冲, 这也是管道的一个局限性。

伪终端

伪终端看起来就像一个双向管道,一端称为 master(主),另一端称为 slave(从)。 从端看上去和真实的终端一样,能够处理所有的终端 I/O 函数。

import os

master, slave = os.openpty()
os.write(master, b'hello from master\n')
print(os.read(slave, 50))
# b'hello from master\n'
os.write(slave, b'hello from slave\n')
print(os.read(master, 50))
# b'hello from master\r\nhello from slave\r\n'

伪终端 echo 默认是开启的,所以最后 master 读的时候,会先读出之前写入的数据。

如果写入的数据没有换行符,就可能不会被传送到另一端, 造成读端一直阻塞(猜测是伪终端的底层实现使用了行缓冲)。

可通过 man openpty.3man pty.7 查看其手册。

subprocess 的实现

subprocess 提供了很多接口, 其核心是 Popen(Process Open),其他部分都通过它来实现。subprocess 也支持 Windows 平台, 这里只分析它在 Unix 平台的实现。

subprocess源码: https://github.com/python/cpython/blob/3.6/Lib/subprocess.py#L540

首先看一下接口原型(L586),参数非常多:

def __init__(self, args, bufsize=-1, executable=None,
             stdin=None, stdout=None, stderr=None,
             preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
             shell=False, cwd=None, env=None, universal_newlines=False,
             startupinfo=None, creationflags=0,
             restore_signals=True, start_new_session=False,
             pass_fds=(), *, encoding=None, errors=None):
    """Create new Popen instance."""

然后看到中间(L648)一大段注释:

# Input and output objects. The general principle is like
# this:
#
# Parent                   Child
# ------                   -----
# p2cwrite   ---stdin--->  p2cread
# c2pread    <--stdout---  c2pwrite
# errread    <--stderr---  errwrite
#
# On POSIX, the child objects are file descriptors.  On
# Windows, these are Windows file handles.  The parent objects
# are file descriptors on both platforms.  The parent objects
# are -1 when not using PIPEs. The child objects are -1
# when not redirecting.

(p2cread, p2cwrite,
 c2pread, c2pwrite,
 errread, errwrite) = self._get_handles(stdin, stdout, stderr)

父子进程通过这三对文件描述符进行通信。

再跳到 _get_handles 的实现(L1144):

def _get_handles(self, stdin, stdout, stderr):
    """Construct and return tuple with IO objects:
    p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
    """
    p2cread, p2cwrite = -1, -1
    c2pread, c2pwrite = -1, -1
    errread, errwrite = -1, -1

    if stdin is None:
        pass
    elif stdin == PIPE:
        p2cread, p2cwrite = os.pipe()
    elif stdin == DEVNULL:
        p2cread = self._get_devnull()
    elif isinstance(stdin, int):
        p2cread = stdin
    else:
        # Assuming file-like object
        p2cread = stdin.fileno()

它创建了三对文件描述符,stdinstdoutstderr 这三个参数都是类似的, 分别对应一对文件描述符。当参数等于 PIPE 时,它就会创建一条管道,这也是最常用的参数。

回到 Popen,接着往下看(L684):

if p2cwrite != -1:
    self.stdin = io.open(p2cwrite, 'wb', bufsize)
    if text_mode:
        self.stdin = io.TextIOWrapper(self.stdin, write_through=True,
                line_buffering=(bufsize == 1),
                encoding=encoding, errors=errors)
if c2pread != -1:
    self.stdout = io.open(c2pread, 'rb', bufsize)
    if text_mode:
        self.stdout = io.TextIOWrapper(self.stdout,
                encoding=encoding, errors=errors)
if errread != -1:
    self.stderr = io.open(errread, 'rb', bufsize)
    if text_mode:
        self.stderr = io.TextIOWrapper(self.stderr,
                encoding=encoding, errors=errors)

self._execute_child(args, executable, preexec_fn, close_fds,
                    pass_fds, cwd, env,
                    startupinfo, creationflags, shell,
                    p2cread, p2cwrite,
                    c2pread, c2pwrite,
                    errread, errwrite,
                    restore_signals, start_new_session)

它把其中三个文件描述符变成了文件对象,用于在父进程中和子进程通信。 注意 bufsize 参数只作用于父进程中的文件对象,子进程中的缓冲是没法控制的。

encodingerrors 参数就不赘述了,可以查看 io 模块的文档。

再看 _execute_child 的实现(L1198):

def _execute_child(self, args, executable, preexec_fn, close_fds,
                   pass_fds, cwd, env,
                   startupinfo, creationflags, shell,
                   p2cread, p2cwrite,
                   c2pread, c2pwrite,
                   errread, errwrite,
                   restore_signals, start_new_session):
    """Execute program (POSIX version)"""

    if isinstance(args, (str, bytes)):
        args = [args]
    else:
        args = list(args)

    if shell:
        args = ["/bin/sh", "-c"] + args
        if executable:
            args[0] = executable

看到这里,args, shell, executable 这几个参数就很好理解了。

args 可以是字符串或列表,shell 表示通过 shell 程序执行命令, executable 是 shell 程序的路径,默认是 /bin/sh

args 是字符串时,通常配合 shell=True 使用,用来执行任意 shell 命令。

接着往下看(L1221):

# For transferring possible exec failure from child to parent.
# Data format: "exception name:hex errno:description"
# Pickle is not used; it is complex and involves memory allocation.
errpipe_read, errpipe_write = os.pipe()

这是第四对文件描述符。子进程在 fork 之后到 exec 之前会执行许多步骤, 万一这些步骤失败了,就通过这对文件描述向父进程传递异常信息,父进程收到后抛出相应的异常。

创建子进程并执行命令(L1252):

fds_to_keep = set(pass_fds)
fds_to_keep.add(errpipe_write)
self.pid = _posixsubprocess.fork_exec(
        args, executable_list,
        close_fds, sorted(fds_to_keep), cwd, env_list,
        p2cread, p2cwrite, c2pread, c2pwrite,
        errread, errwrite,
        errpipe_read, errpipe_write,
        restore_signals, start_new_session, preexec_fn)
self._child_created = True

继续追踪 _posixsubprocess 源码: https://github.com/python/cpython/blob/3.6/Modules/_posixsubprocess.c#L545

static PyObject *
subprocess_fork_exec(PyObject* self, PyObject *args){
// ...省略一堆处理参数的代码
pid = fork();
if (pid == 0) {
    // L692
    child_exec(exec_array, argv, envp, cwd,
               p2cread, p2cwrite, c2pread, c2pwrite,
               errread, errwrite, errpipe_read, errpipe_write,
               close_fds, restore_signals, call_setsid,
               py_fds_to_keep, preexec_fn, preexec_fn_args_tuple);
}
// L733
return PyLong_FromPid(pid);

在子进程中执行 child_exec,父进程返回 pid。

继续看 child_exec 的实现(L390):

static void
child_exec(/*一堆参数*/)
{
    int i, saved_errno, reached_preexec = 0;
    PyObject *result;
    const char* err_msg = "";
    /* Buffer large enough to hold a hex integer.  We can't malloc. */
    char hex_errno[sizeof(saved_errno)*2+1];

    if (make_inheritable(py_fds_to_keep, errpipe_write) < 0)
        goto error;

make_inheritable 把 Popen 中 pass_fds 参数指定的文件描述符设为可继承, 它通过对每个文件描述符调用 _Py_set_inheritable,清除 close-on-exec 标志。

需要注意,父进程的标准 I/O 默认是可继承的。

参考 PEP 446 -- Make newly created file descriptors non-inheritable

int flags, res;
flags = fcntl(fd, F_GETFD);
if (flags == -1) { /* handle the error */ }
flags |= FD_CLOEXEC;
/* or "flags &= ~FD_CLOEXEC;" to clear the flag */
res = fcntl(fd, F_SETFD, flags);
if (res == -1) { /* handle the error */ }

close-on-exec(FD_CLOEXEC) 标志表示这个文件会在进程执行 exec 系统调用时自动关闭。

接着往下看(L430):

/* Dup fds for child.
   dup2() removes the CLOEXEC flag but we must do it ourselves if dup2()
   would be a no-op (issue #10806). */
if (p2cread == 0) {
    if (_Py_set_inheritable(p2cread, 1, NULL) < 0)
        goto error;
}
else if (p2cread != -1)
    POSIX_CALL(dup2(p2cread, 0));  /* stdin */

通过 dup2 系统调用,把 p2cread 设为子进程的标准输入。标准输出和标准错误也是类似的。

接着往下看(L463):

    if (cwd)
           POSIX_CALL(chdir(cwd));

    if (restore_signals)
       _Py_RestoreSignals();

#ifdef HAVE_SETSID
    if (call_setsid)
        POSIX_CALL(setsid());
#endif

cwd 参数,设置当前工作目录。
restore_signals 参数,把信号处理恢复为默认值,涉及信号处理的内容,这里略过。
call_setsid,即 Popen 的 start_new_session 参数, 创建会话并设置进程组 ID,内容太多也略过。

接着往下看(L474):

reached_preexec = 1;
if (preexec_fn != Py_None && preexec_fn_args_tuple) {
    /* This is where the user has asked us to deadlock their program. */
    result = PyObject_Call(preexec_fn, preexec_fn_args_tuple, NULL);
    if (result == NULL) {
        /* Stringifying the exception or traceback would involve
         * memory allocation and thus potential for deadlock.
         * We've already faced potential deadlock by calling back
         * into Python in the first place, so it probably doesn't
         * matter but we avoid it to minimize the possibility. */
        err_msg = "Exception occurred in preexec_fn.";
        errno = 0;  /* We don't want to report an OSError. */
        goto error;
    }
    /* Py_DECREF(result); - We're about to exec so why bother? */
}

/* close FDs after executing preexec_fn, which might open FDs */
if (close_fds) {
    /* TODO HP-UX could use pstat_getproc() if anyone cares about it. */
    _close_open_fds(3, py_fds_to_keep);
}

执行 preexec_fn,随后根据 close_fds 参数判断是否关闭打开的文件描述符。

最后,执行命令(L497):

/* This loop matches the Lib/os.py _execvpe()'s PATH search when */
/* given the executable_list generated by Lib/subprocess.py.     */
saved_errno = 0;
for (i = 0; exec_array[i] != NULL; ++i) {
    const char *executable = exec_array[i];
    if (envp) {
        execve(executable, argv, envp);
    } else {
        execv(executable, argv);
    }
    if (errno != ENOENT && errno != ENOTDIR && saved_errno == 0) {
        saved_errno = errno;
    }
}

尝试执行命令,只要有一个成功,后面的就不会执行了,因为 exec 执行成功后永不返回。

Popen 的创建过程到这就结束了,子进程已经运行起来了,接下来分析如何与子进程通信。

跳到 Popen 中的 communicate 方法(L796):

def communicate(self, input=None, timeout=None):
    if self._communication_started and input:
        raise ValueError("Cannot send input after starting communication")

这个 raise ValueError 是为什么呢?稍后解答。

继续往下看(L813):

# Optimization: If we are not worried about timeouts, we haven't
# started communicating, and we have one or zero pipes, using select()
# or threads is unnecessary.
if (timeout is None and not self._communication_started and
    [self.stdin, self.stdout, self.stderr].count(None) >= 2):
    stdout = None
    stderr = None
    if self.stdin:
        self._stdin_write(input)
    elif self.stdout:
        stdout = self.stdout.read()
        self.stdout.close()
    elif self.stderr:
        stderr = self.stderr.read()
        self.stderr.close()
    self.wait()
else:
    if timeout is not None:
        endtime = _time() + timeout
    else:
        endtime = None

    try:
        stdout, stderr = self._communicate(input, endtime, timeout)
    finally:
        self._communication_started = True

    sts = self.wait(timeout=self._remaining_time(endtime))

return (stdout, stderr)

if 部分直接和子进程通信,else 部分调用了 self._communicate, 用线程和 I/O 多路复用的方式与子进程通信,去掉影响也不大,这部分细节就不分析了。 注意 self._stdin_write(input) 很关键,在 self._communicate 中也是调用它向子进程写数据。

_stdin_write 的实现(L773):

def _stdin_write(self, input):
    # L776
    self.stdin.write(input)
    # L787
    self.stdin.close()

省略了异常处理的代码,主要就这两句。可以看到写完输入之后,立即把标准输入关闭了。 结合上面的 raise ValueError,可以看出只能向子进程写入一次。

这涉及到缓冲的问题,管道是默认全缓冲的,如果不立即关闭写端,子进程读的时候就可能一直阻塞, 我们没法控制子进程使用什么类型的缓冲。同样的,如果子进程一直写输出,父进程也会读不到数据, 只有子进程退出之后,系统自动关闭它的标准输出,父进程才能读到数据。

下面是个例子:

# hello.py
from time import sleep

while True:
    print('hello world')
    sleep(1)
# test_hello.py
from subprocess import *

p = Popen(['python', 'hello.py'], stdin=PIPE, stdout=PIPE)
out, err = p.communicate()
print(out)

运行 python test_hello.py,尽管子进程在不停地输出,但是父进程一直读取不到数据。 此时如果我们把子进程杀死,父进程便会立即读到数据。

subprocess 的核心代码到这就分析完了。
再放个自己实现的 popen(未考虑各种异常情况,仅用于说明实现原理):

# popen.py
import os


class popen:

    def __init__(self, cmd, *, cwd=None, env=None, input=None):
        argv = ['/bin/sh', '-c', cmd]
        # Parent                   Child
        # ------                   -----
        # p2cwrite   ---stdin--->  p2cread
        # c2pread    <--stdout---  c2pwrite
        # errread    <--stderr---  errwrite
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        errread, errwrite = os.pipe()
        pid = os.fork()
        if pid == 0:
            os.close(p2cwrite)
            os.close(c2pread)
            os.close(errread)
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(errwrite, 2)
            if cwd:
                os.chdir(cwd)
            if env:
                os.execve('/bin/sh', argv, env)
            else:
                os.execv('/bin/sh', argv)
        else:
            os.close(p2cread)
            os.close(c2pwrite)
            os.close(errwrite)
            stdin = open(p2cwrite, 'w')
            stdout = open(c2pread, 'r')
            stderr = open(errread, 'r')
            # communicate
            if input:
                stdin.write(input)
                stdin.close()
            self.out = stdout.read()
            stdout.close()
            self.err = stderr.read()
            stderr.read()
            self.status = os.waitpid(pid, 0)[1]

    def __repr__(self):
        fmt = ('STATUS:{}\n' +
               'OUTPUT'.center(80, '-') + '\n{}\n' +
               'ERROR'.center(80, '-') + '\n{}\n')
        return fmt.format(self.status, self.out, self.err)

执行命令试一试:

>>> from popen import popen
>>> popen('python -c "import this"')
STATUS:0
-------------------------------------OUTPUT-------------------------------------
The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!

-------------------------------------ERROR--------------------------------------


>>> popen('python -c "import that"')
STATUS:256
-------------------------------------OUTPUT-------------------------------------

-------------------------------------ERROR--------------------------------------
Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'that'


>>> popen('python --version').out
'Python 3.6.1\n'
>>>

sh 的实现

sh 为 Shell 操作提供了一个非常好用的接口,功能也非常强大。

from sh import ls
print(ls('-l'))

源码:https://github.com/amoffat/sh/blob/1.12.14/sh.py

代码总共 3500 多行,这里只介绍它实现中的关键部分,有兴趣可以详细阅读它的源码。 读代码之前最好先看一下它的文档 Reference,了解各个对象间的逻辑关系以及代码执行流程,避免陷入细节中。

从 sh 模块导入任意命令

我们可以从 sh 模块导入任意命令,而且这些命令都是动态生成的。

下面解析它的实现:

# magicmodule.py
import sys
from types import ModuleType

class MagicModule(ModuleType):

    def __init__(self, name):
        super().__init__(name)

    def __getattr__(self, name):
        return name

sys.modules[__name__] = MagicModule(__name__)

当导入一个模块时,Python 的导入机制会创建一个模块对象存到 sys.modules[__name__] 中,而 from module import attr 就会获取该模块对象的属性。 把系统创建的模块对象替换成实现了 __getattr__ 方法的模块对象,就能导入任意命令了。

>>> from magicmodule import anything
>>> anything
'anything'
>>>

sh 中相关代码在 L3333 SelfWrapper 类 和 L3581 修改 sys.modules

优雅的异常处理

sh 支持这样的用法:

from sh import ErrorReturnCode_12
from sh import SignalException_9
from sh import SignalException_SIGHUP

try:
    some_cmd()
except ErrorReturnCode_12:
    print("couldn't do X")

这些异常类都是动态生成的(L433):

def get_exc_from_name(name):
    """takes an exception name, like:

        ErrorReturnCode_1
        SignalException_9
        SignalException_SIGHUP

    and returns the corresponding exception.  this is primarily used for
    importing exceptions from sh into user code, for instance, to capture those
    exceptions
    """

它用下面这个正则表达式(L424)匹配异常类的名称,取出返回码或者信号码:

rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_((\d+)|SIG[a-zA-Z]+)")

然后动态生成一个类:

exc = ErrorReturnCodeMeta(name, (base,), {"exit_code": rc})

ErrorReturnCodeMeta(L325)是一个元类,动态生成的异常类是元类的实例。

which 命令

sh 用 Python 实现了 which 命令的功能(L522):

>>> sh.which('ls')
'/usr/bin/ls'

实现原理就是遍历 PATH 环境变量中所有的路径,找到符合要求的可执行文件:

# L528
def is_exe(fpath):
    # 存在 & 可执行 & 是个文件
    return (os.path.exists(fpath) and
            os.access(fpath, os.X_OK) and
            os.path.isfile(os.path.realpath(fpath)))

# L554
for path in paths_to_search:
    exe_file = os.path.join(path, program)
    if is_exe(exe_file):
        found_path = exe_file
        break

后面启动子进程需要用到这个函数。

def resolve_command_path(program):
    # 查找可执行文件
    path = which(program)
    if not path:
        # 替换下划线为横杠,再次尝试
        if "_" in program:
            path = which(program.replace("_", "-"))
        if not path:
            return None
    return path


# 导入命令时调用此函数,创建Command对象
def resolve_command(name, baked_args=None):
    path = resolve_command_path(name)
    cmd = None
    if path:
        cmd = Command(path)
        if baked_args:
            cmd = cmd.bake(**baked_args)
    return cmd

Command 类的实现

接下来先看 Command 类的实现(L1054),所有命令都是这个类的实例。

class Command(object):
    """ represents an un-run system program, like "ls" or "cd". """
    # L1065
    _call_args = {
        "fg": False, # run command in foreground

        # run a command in the background.  commands run in the background
        # ignore SIGHUP and do not automatically exit when the parent process
        # ends
        "bg": False,

        # ...一堆参数
    }
    # L1188
    def __init__(self, path, search_paths=None):
        found = which(path, search_paths)
        # L1209
        self._path = encode_to_py3bytes_or_py2str(found)

它把大量的参数定义放在 _call_args 中,这样有几个好处:

  1. 很方便处理大量参数
  2. 很方便写注释
  3. 可读性更好

因为要允许用户直接创建 Command 对象,所以又调用了一次 which

接着看它怎么处理参数的(L1236):

@staticmethod
def _extract_call_args(kwargs):
    """ takes kwargs that were passed to a command's __call__ and extracts
    out the special keyword arguments, we return a tuple of special keyword
    args, and kwargs that will go to the execd command """

    kwargs = kwargs.copy()
    call_args = {}
    for parg, default in Command._call_args.items():
        key = "_" + parg

        if key in kwargs:
            call_args[parg] = kwargs[key]
            del kwargs[key]

    invalid_kwargs = special_kwarg_validator(call_args,
            Command._kwarg_validators)

    if invalid_kwargs:
        exc_msg = []
        for args, error_msg in invalid_kwargs:
            exc_msg.append("  %r: %s" % (args, error_msg))
        exc_msg = "\n".join(exc_msg)
        raise TypeError("Invalid special arguments:\n\n%s\n" % exc_msg)

    return call_args, kwargs

它将参数分成特殊参数(下划线开头)和普通参数,特殊参数能够控制命令的执行过程, 还能看到它对特殊参数进行了统一校验,出错提示也非常清晰。

Command 对象的 bake 方法,功能类似于 functools.partial:

>>> from sh import ls
>>> lslh = ls.bake('-l', '-h')
>>> lslh()
total 56K
-rw-r--r-- 1 guyskk guyskk  155 May 12 13:00 aaa.json
-rw-r--r-- 1 guyskk guyskk  162 May 12 13:00 bbb.py
...
>>> ls
<Command '/usr/bin/ls'>
>>> lslh
<Command '/usr/bin/ls -l -h'>
>>>

bake 方法的实现(L1265):

def bake(self, *args, **kwargs):
    fn = type(self)(self._path)
    fn._partial = True

    call_args, kwargs = self._extract_call_args(kwargs)

    pruned_call_args = call_args
    for k, v in Command._call_args.items():
        try:
            if pruned_call_args[k] == v:
                del pruned_call_args[k]
        except KeyError:
            continue

    fn._partial_call_args.update(self._partial_call_args)
    fn._partial_call_args.update(pruned_call_args)
    fn._partial_baked_args.extend(self._partial_baked_args)
    sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])
    prefix = pruned_call_args.get("long_prefix",
            self._call_args["long_prefix"])
    fn._partial_baked_args.extend(compile_args(args, kwargs, sep, prefix))
    return fn

_partial_call_args 属性稍后会用到。

Command 是 callable 对象,它的 __call__ 方法实现比较复杂(L1324):

def __call__(self, *args, **kwargs):
    # ...中间的具体实现不太好理解,先看最后一行
    return RunningCommand(cmd, call_args, stdin, stdout, stderr)

RunningCommand 是创建子进程执行命令,所以这里主要是处理参数和三个标准IO。

处理管道命令, 如果第一个参数是正在运行的命令,就复用它的标准输入:

# L1373
# check if we're piping via composition
stdin = call_args["in"]
if args:
    first_arg = args.pop(0)
    if isinstance(first_arg, RunningCommand):
        if first_arg.call_args["piped"]:
            stdin = first_arg.process
        else:
            stdin = first_arg.process._pipe_queue

    else:
        args.insert(0, first_arg)

处理 fg(foreground) 参数:

if call_args["fg"]:
    if call_args["env"] is None:
        launch = lambda: os.spawnv(os.P_WAIT, cmd[0], cmd)
    else:
        launch = lambda: os.spawnve(os.P_WAIT, cmd[0], cmd, call_args["env"])

    exit_code = launch()

os.spawn*os.system 运行的效果差不多,区别在于它不需要通过 sh 进程执行命令。

out 参数(err参数也差不多):

# stdout redirection
stdout = call_args["out"]
if output_redirect_is_filename(stdout):
    stdout = open(str(stdout), "wb")

RunningCommand 的实现

先看接口(L649):

class RunningCommand(object):
    """this represents an executing Command object."""
    def __init__(self, cmd, call_args, stdin, stdout, stderr):
        """
            cmd is an array, where each element is encoded as bytes (PY3) or str
            (PY2)
        """

其实和 Popen 的接口差不多,只是它把一堆参数放在 call_args 里面了。

其中有个 iter 参数允许迭代获取输出,而不是等子进程结束后再一次性获取。

# set up which stream should write to the pipe
# TODO, make pipe None by default and limit the size of the Queue
# in oproc.OProc
pipe = OProc.STDOUT
if call_args["iter"] == "out" or call_args["iter"] is True:
    pipe = OProc.STDOUT
elif call_args["iter"] == "err":
    pipe = OProc.STDERR

通过 OProc 创建进程执行命令(L750),之后等待进程结束:

if spawn_process:
    # this lock is needed because of a race condition where a background
    # thread, created in the OProc constructor, may try to access
    # self.process, but it has not been assigned yet
    process_assign_lock = threading.Lock()
    with process_assign_lock:
        self.process = OProc(self, self.log, cmd, stdin, stdout, stderr,
                self.call_args, pipe, process_assign_lock)

    if should_wait:
        self.wait()

其实 RunningCommand 实现了 __str____repr__ 方法, 所以它看上去像字符串,它也实现了 __iter__ 方法,也就能迭代获取输出。

>>> ret = sh.ls('-l')
>>> type(ret)
<class 'sh.RunningCommand'>
>>> for line in ret:
...     print(line)
total 28392

-rwxr-xr-x   1 guyskk guyskk     8464 Jul  1 18:34 a.out

-rw-r--r--   1 guyskk guyskk      421 Jun  4 22:15 app.py

OProc 的实现

OProc(L1678)封装了创建进程以及进程通信的逻辑,绝大部分特殊参数都是在这处理的。 它的构造函数特别特别长,逻辑太多了。

特殊参数这么多,估计作者也很无奈:

# convenience
ca = self.call_args

这里主要看一下 伪终端 相关的参数

  • _tty_in Default value: False, meaning a os.pipe() will be used.

  • _tty_out Default value: True

If True, sh creates a TTY for STDOUT, otherwise use a os.pipe().

子进程的输入默认是管道,输出默认是伪终端。 伪终端是行缓存模式,所以能不停地取到输出,对比一下前面用 Popen 运行 hello.py 的效果:

>>> proc = sh.python('hello.py', _iter='out')
>>> for line in proc:
...     print(line)
...
hello world

hello world

hello world

sh 把 _tty_out 默认值设为 True 使得它在兼容性方面比 Popen 好很多, Why is _tty_out=True the default?

大致看一下实现:

# L1770
elif ca["tty_in"]:
    self._stdin_read_fd, self._stdin_write_fd = pty.openpty()

# tty_in=False is the default
else:
    self._stdin_write_fd, self._stdin_read_fd = os.pipe()

# L1782
# tty_out=True is the default
elif ca["tty_out"]:
    self._stdout_read_fd, self._stdout_write_fd = pty.openpty()

else:
    self._stdout_read_fd, self._stdout_write_fd = os.pipe()

后面的 fork, exec 和 Popen 几乎一样,就不重复介绍了。

读后感

sh 的代码我看了大约一周才理清其中的逻辑,代码太复杂了。

大致有几个原因:

  • Unix 进程本身的复杂性,概念和暗坑很多
  • 用了一些不为人知的 Python 特性(黑魔法)
  • 源码只有一个文件,3500+ 代码,代码结构不清晰, bottle 项目也是这样的问题
  • 功能太强大了

源码中还有非常多细节我没有提到,其实很多我也不明白,所以只能大致地介绍几个要点, 梳理一下命令的执行过程,希望能有所帮助 ╮( ̄▽ ̄")╭