more work on queue optimisations
This commit is contained in:
parent
ef035a29bc
commit
0134a41015
@ -20,6 +20,12 @@ function deliver_run($argv, $argc) {
|
|||||||
dbesc($argv[$x])
|
dbesc($argv[$x])
|
||||||
);
|
);
|
||||||
if($r) {
|
if($r) {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check to see if we have any recent communications with this hub (in the last month).
|
||||||
|
* If not, reduce the outq_priority.
|
||||||
|
*/
|
||||||
|
|
||||||
$h = parse_url($r[0]['outq_posturl']);
|
$h = parse_url($r[0]['outq_posturl']);
|
||||||
if($h) {
|
if($h) {
|
||||||
$base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : '');
|
$base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : '');
|
||||||
@ -38,6 +44,8 @@ function deliver_run($argv, $argc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// "post" queue driver - used for diaspora and friendica-over-diaspora communications.
|
||||||
|
|
||||||
if($r[0]['outq_driver'] === 'post') {
|
if($r[0]['outq_driver'] === 'post') {
|
||||||
$result = z_post_url($r[0]['outq_posturl'],$r[0]['outq_msg']);
|
$result = z_post_url($r[0]['outq_posturl'],$r[0]['outq_msg']);
|
||||||
if($result['success'] && $result['return_code'] < 300) {
|
if($result['success'] && $result['return_code'] < 300) {
|
||||||
@ -92,9 +100,11 @@ function deliver_run($argv, $argc) {
|
|||||||
logger('deliver: dest: ' . $r[0]['outq_posturl'], LOGGER_DEBUG);
|
logger('deliver: dest: ' . $r[0]['outq_posturl'], LOGGER_DEBUG);
|
||||||
$result = zot_zot($r[0]['outq_posturl'],$r[0]['outq_notify']);
|
$result = zot_zot($r[0]['outq_posturl'],$r[0]['outq_notify']);
|
||||||
if($result['success']) {
|
if($result['success']) {
|
||||||
|
logger('deliver: remote zot delivery succeeded to ' . $r[0]['outq_posturl']);
|
||||||
zot_process_response($r[0]['outq_posturl'],$result, $r[0]);
|
zot_process_response($r[0]['outq_posturl'],$result, $r[0]);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
logger('deliver: remote zot delivery failed to ' . $r[0]['outq_posturl']);
|
||||||
$y = q("update outq set outq_updated = '%s' where outq_hash = '%s'",
|
$y = q("update outq set outq_updated = '%s' where outq_hash = '%s'",
|
||||||
dbesc(datetime_convert()),
|
dbesc(datetime_convert()),
|
||||||
dbesc($argv[$x])
|
dbesc($argv[$x])
|
||||||
|
@ -135,10 +135,25 @@ function notifier_run($argv, $argc){
|
|||||||
));
|
));
|
||||||
if($data) {
|
if($data) {
|
||||||
$result = zot_zot($hh['hubloc_callback'],$data);
|
$result = zot_zot($hh['hubloc_callback'],$data);
|
||||||
// zot_queue_item is not yet written
|
|
||||||
// if(! $result['success'])
|
|
||||||
// zot_queue_item();
|
|
||||||
|
|
||||||
|
// if immediate delivery failed, stick it in the queue to try again later.
|
||||||
|
|
||||||
|
if(! $result['success']) {
|
||||||
|
$hash = random_string();
|
||||||
|
q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg )
|
||||||
|
values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )",
|
||||||
|
dbesc($hash),
|
||||||
|
intval($s[0]['channel_account_id']),
|
||||||
|
intval($s[0]['channel_id']),
|
||||||
|
dbesc('zot'),
|
||||||
|
dbesc($hh['hubloc_callback']),
|
||||||
|
intval(1),
|
||||||
|
dbesc(datetime_convert()),
|
||||||
|
dbesc(datetime_convert()),
|
||||||
|
dbesc($data),
|
||||||
|
dbesc('')
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -46,13 +46,18 @@ function queue_run($argv, $argc){
|
|||||||
// the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once
|
// the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once
|
||||||
// or twice a day.
|
// or twice a day.
|
||||||
|
|
||||||
|
// FIXME: can we sort postgres on outq_priority and maintain the 'distinct' ?
|
||||||
|
// The order by max(outq_priority) might be a dodgy query because of the group by.
|
||||||
|
// The desired result is to return a sequence in the order most likely to be delivered in this run.
|
||||||
|
// If a hub has already been sitting in the queue for a few days, they should be delivered last;
|
||||||
|
// hence every failure should drop them further down the priority list.
|
||||||
|
|
||||||
if(ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
|
if(ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
|
||||||
$prefix = 'DISTINCT ON (outq_posturl)';
|
$prefix = 'DISTINCT ON (outq_posturl)';
|
||||||
$suffix = 'ORDER BY outq_posturl';
|
$suffix = 'ORDER BY outq_posturl';
|
||||||
} else {
|
} else {
|
||||||
$prefix = '';
|
$prefix = '';
|
||||||
$suffix = 'GROUP BY outq_posturl';
|
$suffix = 'GROUP BY outq_posturl ORDER BY max(outq_priority)';
|
||||||
}
|
}
|
||||||
$r = q("SELECT $prefix * FROM outq WHERE outq_delivered = 0 and (( outq_created > %s - INTERVAL %s and outq_updated < %s - INTERVAL %s ) OR ( outq_updated < %s - INTERVAL %s )) $suffix",
|
$r = q("SELECT $prefix * FROM outq WHERE outq_delivered = 0 and (( outq_created > %s - INTERVAL %s and outq_updated < %s - INTERVAL %s ) OR ( outq_updated < %s - INTERVAL %s )) $suffix",
|
||||||
db_utcnow(), db_quoteinterval('12 HOUR'),
|
db_utcnow(), db_quoteinterval('12 HOUR'),
|
||||||
@ -77,7 +82,7 @@ function queue_run($argv, $argc){
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logger('queue: queue post returned ' . $result['return_code'] . ' from ' . $rr['outq_posturl'],LOGGER_DEBUG);
|
logger('queue: queue post returned ' . $result['return_code'] . ' from ' . $rr['outq_posturl'],LOGGER_DEBUG);
|
||||||
$y = q("update outq set outq_updated = '%s' where outq_hash = '%s'",
|
$y = q("update outq set outq_updated = '%s', outq_priority = outq_priority + 10 where outq_hash = '%s'",
|
||||||
dbesc(datetime_convert()),
|
dbesc(datetime_convert()),
|
||||||
dbesc($rr['outq_hash'])
|
dbesc($rr['outq_hash'])
|
||||||
);
|
);
|
||||||
@ -86,11 +91,13 @@ function queue_run($argv, $argc){
|
|||||||
}
|
}
|
||||||
$result = zot_zot($rr['outq_posturl'],$rr['outq_notify']);
|
$result = zot_zot($rr['outq_posturl'],$rr['outq_notify']);
|
||||||
if($result['success']) {
|
if($result['success']) {
|
||||||
|
logger('queue: deliver zot success to ' . $rr['outq_posturl'], LOGGER_DEBUG);
|
||||||
zot_process_response($rr['outq_posturl'],$result, $rr);
|
zot_process_response($rr['outq_posturl'],$result, $rr);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
$deadguys[] = $rr['outq_posturl'];
|
$deadguys[] = $rr['outq_posturl'];
|
||||||
$y = q("update outq set outq_updated = '%s' where outq_hash = '%s'",
|
logger('queue: deliver zot returned ' . $result['return_code'] . ' from ' . $rr['outq_posturl'],LOGGER_DEBUG);
|
||||||
|
$y = q("update outq set outq_updated = '%s', outq_priority = outq_priority + 10 where outq_hash = '%s'",
|
||||||
dbesc(datetime_convert()),
|
dbesc(datetime_convert()),
|
||||||
dbesc($rr['outq_hash'])
|
dbesc($rr['outq_hash'])
|
||||||
);
|
);
|
||||||
|
@ -614,7 +614,7 @@ function admin_page_queue($a) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
$r = q("select count(outq_posturl) as total, outq_posturl from outq
|
$r = q("select count(outq_posturl) as total, max(outq_priority) as priority, outq_posturl from outq
|
||||||
where outq_delivered = 0 group by outq_posturl order by total desc");
|
where outq_delivered = 0 group by outq_posturl order by total desc");
|
||||||
|
|
||||||
for($x = 0; $x < count($r); $x ++) {
|
for($x = 0; $x < count($r); $x ++) {
|
||||||
@ -626,6 +626,7 @@ function admin_page_queue($a) {
|
|||||||
$o = replace_macros(get_markup_template('admin_queue.tpl'), array(
|
$o = replace_macros(get_markup_template('admin_queue.tpl'), array(
|
||||||
'$banner' => t('Queue Statistics'),
|
'$banner' => t('Queue Statistics'),
|
||||||
'$numentries' => t('Total Entries'),
|
'$numentries' => t('Total Entries'),
|
||||||
|
'$priority' => t('Priority'),
|
||||||
'$desturl' => t('Destination URL'),
|
'$desturl' => t('Destination URL'),
|
||||||
'$nukehub' => t('Mark hub permanently offline'),
|
'$nukehub' => t('Mark hub permanently offline'),
|
||||||
'$empty' => t('Empty queue for this hub'),
|
'$empty' => t('Empty queue for this hub'),
|
||||||
|
@ -2,11 +2,11 @@
|
|||||||
|
|
||||||
{{if $hasentries}}
|
{{if $hasentries}}
|
||||||
|
|
||||||
<table id="admin-queue-table"><tr><td>{{$numentries}} </td><td>{{$desturl}}</td><td> </td><td> </td></tr>
|
<table id="admin-queue-table"><tr><td>{{$numentries}} </td><td>{{$desturl}}</td><td>{{$priority}}</td><td> </td><td> </td></tr>
|
||||||
|
|
||||||
{{foreach $entries as $e}}
|
{{foreach $entries as $e}}
|
||||||
|
|
||||||
<tr><td>{{$e.total}}</td><td>{{$e.outq_posturl}}</td>{{if $expert}}<td><a href="admin/queue?f=&drophub={{$e.eurl}}" title="{{$nukehub}}" class="btn btn-default"><i class="icon-remove"></i><a></td><td><a href="admin/queue?f=&emptyhub={{$e.eurl}}" title="{{$empty}}" class="btn btn-default"><i class="icon-trash"></i></a></td>{{/if}}</tr>
|
<tr><td>{{$e.total}}</td><td>{{$e.outq_posturl}}</td><td>{{$e.priority}}</td>{{if $expert}}<td><a href="admin/queue?f=&drophub={{$e.eurl}}" title="{{$nukehub}}" class="btn btn-default"><i class="icon-remove"></i><a></td><td><a href="admin/queue?f=&emptyhub={{$e.eurl}}" title="{{$empty}}" class="btn btn-default"><i class="icon-trash"></i></a></td>{{/if}}</tr>
|
||||||
{{/foreach}}
|
{{/foreach}}
|
||||||
|
|
||||||
</table>
|
</table>
|
||||||
|
Reference in New Issue
Block a user