Remove Experimental Worker Queue from CORE - add hook 'daemon_master_release'

This commit is contained in:
M. Dent 2019-01-09 10:16:53 +01:00 committed by Mario
parent 24354b9dcc
commit 5d88326915
3 changed files with 22 additions and 121 deletions

View File

@ -16,8 +16,6 @@ if(array_search( __file__ , get_included_files()) === 0) {
class Master {
static public $queueworker = null;
static public function Summon($arr) {
proc_run('php','Zotlabs/Daemon/Master.php',$arr);
}
@ -25,126 +23,21 @@ class Master {
static public function Release($argc,$argv) {
cli_startup();
$maxworkers = get_config('system','max_queue_workers');
$hookinfo = [
'argv'=>$argv
];
if (!$maxworkers || $maxworkers == 0) {
logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
$cls::run($argc,$argv);
self::ClearQueue();
} else {
logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
$workinfo = ['argc'=>$argc,'argv'=>$argv];
q("insert into config (cat,k,v) values ('queuework','%s','%s')",
dbesc(uniqid('workitem:',true)),
dbesc(serialize($workinfo)));
self::Process();
call_hooks ('daemon_master_release',$hookinfo);
$argv = $hookinfo['argv'];
$argc = count($argv);
if ((!is_array($argv) || (count($argv) < 1))) {
return;
}
logger('Master: release: ' . json_encode($argv), LOGGER_ALL,LOG_DEBUG);
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
$cls::run($argc,$argv);
}
static public function GetWorkerID() {
$maxworkers = get_config('system','max_queue_workers');
$maxworkers = ($maxworkers) ? $maxworkers : 3;
$workermaxage = get_config('system','max_queue_worker_age');
$workermaxage = ($workermaxage) ? $workermaxage : 300;
$workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%');
if (count($workers) > $maxworkers) {
foreach ($workers as $idx => $worker) {
$curtime = time();
$age = (intval($curtime) - intval($worker['v']));
if ( $age > $workermaxage) {
logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG);
$k = explode('_',$worker['k']);
q("delete from config where cat='queueworkers' and k='%s'",
'workerstarted_'.$k[1]);
q("update config set k='%s' where cat='queuework' and k='%s'",
dbesc(uniqid('workitem:',true)),
'workitem_'.$k[1]);
unset($workers[$idx]);
}
}
if (count($workers) > $maxworkers) {
return false;
}
}
return uniqid('',true);
}
static public function Process() {
self::$queueworker = self::GetWorkerID();
if (!self::$queueworker) {
logger('Master: unable to obtain worker ID.');
killme();
}
set_config('queueworkers','workerstarted_'.self::$queueworker,time());
$workersleep = get_config('system','queue_worker_sleep');
$workersleep = ($workersleep) ? $workersleep : 5;
cli_startup();
$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
'workitem_'.self::$queueworker,
dbesc('workitem:%'));
$jobs = 0;
while ($work) {
$workitem = q("select * from config where cat='queuework' and k='%s'",
'workitem_'.self::$queueworker);
if (isset($workitem[0])) {
$jobs++;
$workinfo = unserialize($workitem[0]['v']);
$argc = $workinfo['argc'];
$argv = $workinfo['argv'];
logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
//Delete unclaimed duplicate workitems.
q("delete from config where cat='queuework' and k='workitem' and v='%s'",
serialize($argv));
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
$cls::run($argc,$argv);
//Right now we assume that if we get a return, everything is OK.
//At some point we may want to test whether the run returns true/false
// and requeue the work to be tried again. But we probably want
// to implement some sort of "retry interval" first.
q("delete from config where cat='queuework' and k='%s'",
'workitem_'.self::$queueworker);
} else {
break;
}
sleep ($workersleep);
$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
'workitem_'.self::$queueworker,
dbesc('workitem:%'));
}
logger('Master: Worker Thread: queue items processed:' . $jobs);
q("delete from config where cat='queueworkers' and k='%s'",
'workerstarted_'.self::$queueworker);
}
static public function ClearQueue() {
$work = q("select * from config where cat='queuework' and k like '%s'",
dbesc('workitem%'));
foreach ($work as $workitem) {
$workinfo = unserialize($workitem['v']);
$argc = $workinfo['argc'];
$argv = $workinfo['argv'];
logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
$cls::run($argc,$argv);
}
$work = q("delete from config where cat='queuework' and k like '%s'",
dbesc('workitem%'));
}
}

View File

@ -0,0 +1,5 @@
[h2]daemon_master_release[/h2]
Permit filtering or alternate methods of processing of background processes when [code] \Zotlabs\Daemon\Master::Release() [/code] is called.
Default behavior is for a new PHP process to fire immediately upon a call to Master::Summon(). This hook permits pre-emption and the ability to provide queuing or other alternatives to this procedure.

View File

@ -190,6 +190,9 @@ Hooks allow plugins/addons to "hook into" the code at many points and alter the
[zrl=[baseurl]/help/hook/daemon_addon]daemon_addon[/zrl]
Called when invoking the extensible background daemon
[zrl=[baseurl]/help/hook/daemon_master_release]daemon_master_release[/zrl]
Called at the start of processing \Zotlabs\Daemon\Master::Release()
[zrl=[baseurl]/help/hook/directory_item]directory_item[/zrl]
Called when generating a directory listing for display