150 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			150 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| <?php
 | |
| 
 | |
| namespace Zotlabs\Daemon;
 | |
| 
 | |
| if(array_search( __file__ , get_included_files()) === 0) {
 | |
| 	require_once('include/cli_startup.php');
 | |
| 	array_shift($argv);
 | |
| 	$argc = count($argv);
 | |
| 
 | |
| 	if($argc)
 | |
| 		Master::Release($argc,$argv);
 | |
| 	killme();
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| class Master {
 | |
| 
 | |
| 	static public $queueworker = null;
 | |
| 
 | |
| 	static public function Summon($arr) {
 | |
| 		proc_run('php','Zotlabs/Daemon/Master.php',$arr);
 | |
| 	}
 | |
| 
 | |
| 	static public function Release($argc,$argv) {
 | |
| 		cli_startup();
 | |
| 
 | |
| 		$maxworkers = get_config('system','max_queue_workers');
 | |
| 
 | |
| 		if (!$maxworkers || $maxworkers == 0) {
 | |
| 			logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
 | |
|                 	$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
 | |
|                 	$cls::run($argc,$argv);
 | |
| 			self::ClearQueue();
 | |
| 		} else {
 | |
| 			logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
 | |
| 			$workinfo = ['argc'=>$argc,'argv'=>$argv];
 | |
| 			q("insert into config (cat,k,v) values ('queuework','%s','%s')",
 | |
| 				dbesc(uniqid('workitem:',true)),
 | |
| 				dbesc(serialize($workinfo)));
 | |
| 			self::Process();
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	static public function GetWorkerID() {
 | |
| 		$maxworkers = get_config('system','max_queue_workers');
 | |
| 		$maxworkers = ($maxworkers) ? $maxworkers : 3;
 | |
| 
 | |
| 		$workermaxage = get_config('system','max_queue_worker_age');
 | |
| 		$workermaxage = ($workermaxage) ? $workermaxage : 300;
 | |
| 
 | |
| 		$workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%');
 | |
| 
 | |
| 		if (count($workers) > $maxworkers) {
 | |
| 			foreach ($workers as $idx => $worker) {
 | |
|                                 $curtime = time();
 | |
|                                 $age = (intval($curtime) - intval($worker['v']));
 | |
|                                 if ( $age > $workermaxage) {
 | |
| 					logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG);
 | |
| 					$k = explode('_',$worker['k']);
 | |
| 					q("delete from config where cat='queueworkers' and k='%s'",
 | |
| 						'workerstarted_'.$k[1]);
 | |
| 					q("update config set k='workitem' where cat='queuework' and k='%s'",
 | |
| 						'workitem_'.$k[1]);
 | |
| 					unset($workers[$idx]);
 | |
| 				}
 | |
| 			}
 | |
| 			if (count($workers) > $maxworkers) {
 | |
| 				return false;
 | |
| 			}
 | |
| 		}
 | |
| 		return uniqid();
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	static public function Process() {
 | |
| 
 | |
| 		self::$queueworker = self::GetWorkerID();
 | |
| 
 | |
| 		if (!self::$queueworker) {
 | |
| 			logger('Master: unable to obtain worker ID.');
 | |
| 			killme();
 | |
| 		}
 | |
| 
 | |
| 		set_config('queueworkers','workerstarted_'.self::$queueworker,time());
 | |
| 
 | |
| 		$workersleep = get_config('system','queue_worker_sleep');
 | |
| 		$workersleep = ($workersleep) ? $workersleep : 5;
 | |
| 		cli_startup();
 | |
| 
 | |
| 		$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
 | |
| 			'workitem_'.self::$queueworker,
 | |
| 			dbesc('workitem:%'));
 | |
| 		$jobs = 0;
 | |
| 		while ($work) {
 | |
| 			$workitem = q("select * from config where cat='queuework' and k='%s'",
 | |
| 				'workitem_'.self::$queueworker);
 | |
| 
 | |
| 			if (isset($workitem[0])) {
 | |
| 				$jobs++;
 | |
| 				$workinfo = unserialize($workitem[0]['v']);
 | |
| 				$argc = $workinfo['argc'];
 | |
| 				$argv = $workinfo['argv'];
 | |
| 				logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
 | |
| 
 | |
| 				//Delete unclaimed duplicate workitems.
 | |
| 				q("delete from config where cat='queuework' and k='workitem' and v='%s'",
 | |
| 					serialize($argv));
 | |
| 
 | |
| 				$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
 | |
| 				$cls::run($argc,$argv);
 | |
| 
 | |
| 				//Right now we assume that if we get a return, everything is OK.
 | |
| 				//At some point we may want to test whether the run returns true/false
 | |
| 				//    and requeue the work to be tried again.  But we probably want
 | |
| 				//    to implement some sort of "retry interval" first.
 | |
| 
 | |
| 				q("delete from config where cat='queuework' and k='%s'",
 | |
| 					'workitem_'.self::$queueworker);
 | |
| 			} else {
 | |
| 				break;
 | |
| 			}
 | |
| 			sleep ($workersleep);
 | |
| 			$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
 | |
| 				'workitem_'.self::$queueworker,
 | |
| 				dbesc('workitem:%'));
 | |
| 
 | |
| 		}
 | |
| 		logger('Master: Worker Thread: queue items processed:' . $jobs);
 | |
| 		q("delete from config where cat='queueworkers' and k='%s'",
 | |
| 			'workerstarted_'.self::$queueworker);
 | |
| 	}	
 | |
| 
 | |
| 	static public function ClearQueue() {
 | |
| 			$work = q("select * from config where cat='queuework' and k like '%s'",
 | |
| 				dbesc('workitem%'));
 | |
| 			foreach ($work as $workitem) {
 | |
| 				$workinfo = unserialize($workitem['v']);
 | |
| 				$argc = $workinfo['argc'];
 | |
| 				$argv = $workinfo['argv'];
 | |
| 				logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
 | |
| 				$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
 | |
| 				$cls::run($argc,$argv);
 | |
| 			}
 | |
| 			$work = q("delete from config where cat='queuework' and k like '%s'",
 | |
| 				dbesc('workitem%'));
 | |
| 	}
 | |
| 	
 | |
| }
 |