151 lines
4.5 KiB
PHP
151 lines
4.5 KiB
PHP
<?php
|
|
|
|
namespace Zotlabs\Daemon;
|
|
|
|
if(array_search( __file__ , get_included_files()) === 0) {
|
|
require_once('include/cli_startup.php');
|
|
array_shift($argv);
|
|
$argc = count($argv);
|
|
|
|
if($argc)
|
|
Master::Release($argc,$argv);
|
|
killme();
|
|
}
|
|
|
|
|
|
|
|
class Master {
|
|
|
|
static public $queueworker = null;
|
|
|
|
static public function Summon($arr) {
|
|
proc_run('php','Zotlabs/Daemon/Master.php',$arr);
|
|
}
|
|
|
|
static public function Release($argc,$argv) {
|
|
cli_startup();
|
|
|
|
$maxworkers = get_config('system','max_queue_workers');
|
|
|
|
if (!$maxworkers || $maxworkers == 0) {
|
|
logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
|
|
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
|
|
$cls::run($argc,$argv);
|
|
self::ClearQueue();
|
|
} else {
|
|
logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
|
|
$workinfo = ['argc'=>$argc,'argv'=>$argv];
|
|
q("insert into config (cat,k,v) values ('queuework','%s','%s')",
|
|
dbesc(uniqid('workitem:',true)),
|
|
dbesc(serialize($workinfo)));
|
|
self::Process();
|
|
}
|
|
}
|
|
|
|
static public function GetWorkerID() {
|
|
$maxworkers = get_config('system','max_queue_workers');
|
|
$maxworkers = ($maxworkers) ? $maxworkers : 3;
|
|
|
|
$workermaxage = get_config('system','max_queue_worker_age');
|
|
$workermaxage = ($workermaxage) ? $workermaxage : 300;
|
|
|
|
$workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%');
|
|
|
|
if (count($workers) > $maxworkers) {
|
|
foreach ($workers as $idx => $worker) {
|
|
$curtime = time();
|
|
$age = (intval($curtime) - intval($worker['v']));
|
|
if ( $age > $workermaxage) {
|
|
logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG);
|
|
$k = explode('_',$worker['k']);
|
|
q("delete from config where cat='queueworkers' and k='%s'",
|
|
'workerstarted_'.$k[1]);
|
|
q("update config set k='%s' where cat='queuework' and k='%s'",
|
|
dbesc(uniqid('workitem:',true)),
|
|
'workitem_'.$k[1]);
|
|
unset($workers[$idx]);
|
|
}
|
|
}
|
|
if (count($workers) > $maxworkers) {
|
|
return false;
|
|
}
|
|
}
|
|
return uniqid('',true);
|
|
|
|
}
|
|
|
|
static public function Process() {
|
|
|
|
self::$queueworker = self::GetWorkerID();
|
|
|
|
if (!self::$queueworker) {
|
|
logger('Master: unable to obtain worker ID.');
|
|
killme();
|
|
}
|
|
|
|
set_config('queueworkers','workerstarted_'.self::$queueworker,time());
|
|
|
|
$workersleep = get_config('system','queue_worker_sleep');
|
|
$workersleep = ($workersleep) ? $workersleep : 5;
|
|
cli_startup();
|
|
|
|
$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
|
|
'workitem_'.self::$queueworker,
|
|
dbesc('workitem:%'));
|
|
$jobs = 0;
|
|
while ($work) {
|
|
$workitem = q("select * from config where cat='queuework' and k='%s'",
|
|
'workitem_'.self::$queueworker);
|
|
|
|
if (isset($workitem[0])) {
|
|
$jobs++;
|
|
$workinfo = unserialize($workitem[0]['v']);
|
|
$argc = $workinfo['argc'];
|
|
$argv = $workinfo['argv'];
|
|
logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
|
|
|
|
//Delete unclaimed duplicate workitems.
|
|
q("delete from config where cat='queuework' and k='workitem' and v='%s'",
|
|
serialize($argv));
|
|
|
|
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
|
|
$cls::run($argc,$argv);
|
|
|
|
//Right now we assume that if we get a return, everything is OK.
|
|
//At some point we may want to test whether the run returns true/false
|
|
// and requeue the work to be tried again. But we probably want
|
|
// to implement some sort of "retry interval" first.
|
|
|
|
q("delete from config where cat='queuework' and k='%s'",
|
|
'workitem_'.self::$queueworker);
|
|
} else {
|
|
break;
|
|
}
|
|
sleep ($workersleep);
|
|
$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
|
|
'workitem_'.self::$queueworker,
|
|
dbesc('workitem:%'));
|
|
|
|
}
|
|
logger('Master: Worker Thread: queue items processed:' . $jobs);
|
|
q("delete from config where cat='queueworkers' and k='%s'",
|
|
'workerstarted_'.self::$queueworker);
|
|
}
|
|
|
|
static public function ClearQueue() {
|
|
$work = q("select * from config where cat='queuework' and k like '%s'",
|
|
dbesc('workitem%'));
|
|
foreach ($work as $workitem) {
|
|
$workinfo = unserialize($workitem['v']);
|
|
$argc = $workinfo['argc'];
|
|
$argv = $workinfo['argv'];
|
|
logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
|
|
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
|
|
$cls::run($argc,$argv);
|
|
}
|
|
$work = q("delete from config where cat='queuework' and k like '%s'",
|
|
dbesc('workitem%'));
|
|
}
|
|
|
|
}
|