PHPにはマルチスレッドっていう概念がないのか?じゃあC#でBackgroundWorkerにやらせていたような非同期処理はいったいどうやってやればいいんだ?とかいう疑問があったけど、プロセス制御のPCNTL関数を使えば似たようなことができるそうだ。

並列処理の実装

PHP 5.3.0 CLI + CentOS 5.4で検証した。eclipseで実行すると、なぜか途中で処理が終了する。コンソールから実行するとちゃんと動いた。ソースからインストールする場合は、configureオプションに--enable-pcntlを付けないと、実行時に関数が見つからなくて、エラーになる。

<?php
//同時に起動する子プロセスの最大数(default)
define('MAX_PROCESS_NUM', 3);
//子プロセスを強制終了するまでの秒数(default)
define('TIMEOUT',120);
class MultiProcess
{
/** @var Array 子プロセスで実行される関数の引数 */
private $args;
/** @var Integer 同時に起動する子プロセスの最大数 */
private $maxProcessNum;
/**
* 子プロセスで実行されるコールバックを設定する。
* @param $callback
*/
public function setWork($callback)
{
$this->work = $callback;
}
/**
* コンストラクタ。
*
* @param Array $args 引数
* @param Integer $maxProcessNum 同時に起動する子プロセスの最大数
*/
public function __construct($args, $maxProcessNum = MAX_PROCESS_NUM)
{
//プロパティ設定
$this->args = $args;
$this->maxProcessNum = $maxProcessNum;
//シグナルにシグナルハンドラを登録
pcntl_signal(SIGTERM, array($this, 'sigHandler'));
pcntl_signal(SIGHUP, array($this, 'sigHandler'));
pcntl_signal(SIGUSR1, array($this, 'sigHandler'));
pcntl_signal(SIGALRM, array($this, 'sigHandler'));
//子プロセスで実行する処理を登録
$this->work = array($this, 'doWork');
}
/**
* デフォルトのシグナルハンドラ。
*
* @param Integer $signo シグナル
*/
private function sigHandler($signo)
{
switch ($signo)
{
case SIGTERM: //シャットダウン
echo "shutdown...\n";
exit;
break;
case SIGHUP: //リブート
echo "reboot...\n";
break;
case SIGUSR1: //ユーザーシグナル
echo "SIGUSER($signo)\n";
break;
case SIGALRM: //アラーム
echo "alarm...\n";
exit;
break;
default: //その他
echo "Other signal: " . $signo . "\n";
}
}
/**
* 子プロセスで実行される処理。
* @param $args
*/
private function doWork($args)
{
$sec = 1;
echo time()." : do work for args($args). do nothing but sleep $sec sec.\n";
sleep($sec);
echo time()." : do work for args($args). finished.\n";
}
/**
* マルチプロセス実行。
*/
public function run()
{
$pchild = 0; //現在起動している子プロセス数
$pnum = 0; //プロセスナンバー
$pend = 0; //終了した子プロセス数
while(TRUE)
{
if(count($this->args) <= $pend) //終了判定
{
echo "loop ends.\n";
break;
}
if( $pchild < $this->maxProcessNum &&
$pnum < count($this->args) ) //最大起動数を越えない場合
{
//子プロセス生成
$pid = pcntl_fork();
if ($pid == -1) //エラー発生時(子プロセスのforkに失敗した場合)
{
throw new Exception('Failed forc process.');
}
else if ($pid) //親プロセス
{
//起動数を追加
$pchild++;
$pnum++;
}
else //子プロセス
{
$arg = $this->args[$pnum];
//TIMEOUT 秒後に強制終了
pcntl_alarm(TIMEOUT);
// 子プロセスで行う仕事
if (!is_array($this->work)) //外部クロージャが設定されている場合
{
$function = $this->work;
$function($arg);
}
else //デフォルト処理を実行、または外部インスタンスのメソッドを実行( $mp->setWork(array($obj,"obj_function")); )
{
$obj = $this->work[0];
$func = $this->work[1];
$obj->$func($arg);
}
// 子プロセスを終了
exit(0);
}
}
else //最大起動数を越えた場合
{
echo "waiting.\n";
$pid = pcntl_waitpid(-1, $status, WUNTRACED);
$pchild--;
$pend++;
echo "$pid stopped.\n";
}
}
echo "kicked all.\n";
}
}
/**
* 別オブジェクトのメソッド実行用クラス。
*/
class extFunc
{
/**
* MultiProcess内部で実行されるメソッド。
* @param $arg
*/
function hello($arg)
{
echo "hello $arg! now that you can execute external object's function! \n";
}
}
/**
* テスト用
*/
function test()
{
//引数リスト
$args = array(
"101", "102", "103", "104", "105", "106", "107"
);
//デフォルトのdoWorkを実行
echo " default doWork.\n";
$mp = new MultiProcess($args);
$mp->run();
//クロージャを設定してdoWorkを実行
echo "\n using Closure.\n";
$func = function($arg)
{
echo "external function.\n";
};
$mp->setWork($func);
$mp->run();
//別オブジェクトのメソッドを設定してdoWorkを実行
echo "\n using external function.\n";
$ext = new extFunc();
$mp->setWork(array($ext, "hello"));
$mp->run();
}
//テスト実行
test();

並列処理が終わった後のコールバックとか、関数が定義されていない場合の検証ロジックなんかが抜けてる。気が向いた人はどうぞ。他にもPROC関数を使うとか、STREAM関数を使うとかの方法があるらしい。けど、このコードで並列処理を実行するという目的は達成できたので、あとは参考までに。

参考