ACE(2) ACE_Process
原文链接 http://www.k8s.tips/tech/2013/01/14/ACE2_ACE_Process/
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。
ACE provides lots of wrapper classes to hide the difference between OS. ACE_Process is one of them to give us an unified operation interface to fork/kill/wait a process. And ACE_Process is also a good example code to show useful function, such as how to check whether a process is alive.
ACE_Process_Options
The ACE_Process_Options is a class that used to set child process’s attributes, such as command line, arguments and environment. In OGL, command_line is used to set task’s binary:
int ACE_Process_Options::command_line(const ACE_ANTI_TCHAR* format, ... );
In JobRunner, m_taskProcessOption points to an ACE_Process_Options object; this member is updated when JobRunner service to a Job (bindJob):
releaseObject<ACE_Process_Options>(m_taskProcessOption);
ACE_NEW_RETURN(m_taskProcessOption, ACE_Process_Options(), -1);
m_taskProcessOption->command_line(jobOption->command());
Spawn child process
ACE_Process is a set of process’s runtime attribute, such as pid; and it’s also a collection of child process operations. JobRunner uses ACE_Process::spawn(…) to execute task:
task.spawn(*(this->m_taskProcessOption));
Re-direct stdio
Pipe is a easy way for parent/child process to exchange data; in OGL, the input/output handles of child process are re-directed to a pipe, which is used to get task input from JobRunner and send the task output back.
ACE_HANDLE taskOutput[2];
ACE_HANDLE taskInput[2];
ACE_OS::pipe(taskOutput);
ACE_OS::pipe(taskInput);
this->m_taskProcessOption->release_handles();
this->m_taskProcessOption->set_handles(taskInput[OGL_PIPE_READ],
taskOutput[OGL_PIPE_WRITE]);
ACE_OS::close(taskInput[OGL_PIPE_READ]);
ACE_OS::close(taskOutput[OGL_PIPE_WRITE]);
task.close_dup_handles();
task.close_passed_handles();
// release the duplicated handles
m_taskProcessOption->release_handles();
Child process operation
Wait child process: just like the wait function in Linux, there is also a wait function in ACE_Process for parent process to synchronize with its child process.
task.wait();
Is child process running: ACE_Process also provide a function to check whether the child process is alive, named running(). The implementing of running function in Linux is kill(pid, 0).
// Return 1 if running; 0 otherwise.
int running (void) const
If sig is 0, then no signal is sent, but error checking is still performed; this can be used to check for the existence of a process ID or process group ID.
Reference man kill for more details.
Code in OGL
int JobRunner::bindJobRunner(ogl::CommandHeader& header, ogl::JobOption* jobOption)
{
OGL_LOG_DEBUG("bind job runner for job id: <%d>, runner id: <%s>",
(int)jobOption->id(),
jobOption->runnerId());
releaseObject<ACE_Process_Options>(m_taskProcessOption);
ACE_NEW_RETURN(m_taskProcessOption, ACE_Process_Options(), -1);
m_taskProcessOption->command_line(jobOption->command());
CommandHeader respHeader(BindJobRunnerComplete, jobOption->runnerId());
return sendResponse(respHeader, jobOption);
}
int JobRunner::executeTask(ogl::CommandHeader& header, ogl::TaskOption* taskOption)
{
OGL_LOG_DEBUG("Execute task for job id: <%d>, task id: <%d>, runner id: <%s>",
(int)taskOption->jobId(),
(int)taskOption->taskId(),
taskOption->runnerId());
ACE_Process task;
ACE_HANDLE taskOutput[2];
ACE_HANDLE taskInput[2];
ACE_OS::pipe(taskOutput);
ACE_OS::pipe(taskInput);
this->m_taskProcessOption->release_handles();
this->m_taskProcessOption->set_handles(taskInput[OGL_PIPE_READ],
taskOutput[OGL_PIPE_WRITE]);
ogl::File inputStream(taskInput[OGL_PIPE_WRITE]);
ogl::File outputStream(taskOutput[OGL_PIPE_READ]);
task.spawn(*(this->m_taskProcessOption));
ACE_OS::close(taskInput[OGL_PIPE_READ]);
ACE_OS::close(taskOutput[OGL_PIPE_WRITE]);
task.close_dup_handles();
task.close_passed_handles();
// release the duplicated handles
m_taskProcessOption->release_handles();
ogl::Buffer& input = taskOption->taskInput();
if (input.size() > 0) {
inputStream.write(input);
}
ogl::Buffer& output = taskOption->taskOutput();
outputStream.read(output);
task.wait();
CommandHeader respHeader(ExecuteTaskComplete, header.contextId());
return sendResponse(respHeader, taskOption);
}