mail sync/migrate continued; also abstract delivery loop to make it re-usable, change refresh_all to use delivery loop.

This commit is contained in:
redmatrix 2015-10-15 18:52:04 -07:00
parent 7517c76ae4
commit 93f061f78a
9 changed files with 115 additions and 95 deletions

View File

@ -643,7 +643,7 @@ function identity_basic_export($channel_id, $items = false) {
}
$r = q("select mail.*, conv.guid as conv_guid from mail left join conv on mail.convid = conv.id where mail.uid = %d",
$r = q("select * from mail where mail.uid = %d",
intval($channel_id)
);
if($r) {

View File

@ -849,15 +849,6 @@ function import_mail($channel,$mails) {
if(! $m)
continue;
if($mail['conv_guid']) {
$x = q("select id from conv where guid = '%s' and uid = %d limit 1",
dbesc($mail['conv_guid']),
intval($channel['channel_id'])
);
if($x) {
$m['convid'] = $x[0]['id'];
}
}
$m['aid'] = $channel['channel_account_id'];
$m['uid'] = $channel['channel_id'];
mail_store($m);

View File

@ -1618,6 +1618,8 @@ function get_mail_elements($x) {
$arr['body'] = (($x['body']) ? htmlspecialchars($x['body'], ENT_COMPAT,'UTF-8',false) : '');
$arr['title'] = (($x['title'])? htmlspecialchars($x['title'],ENT_COMPAT,'UTF-8',false) : '');
$arr['conv_guid'] = (($x['conv_guid'])? htmlspecialchars($x['conv_guid'],ENT_COMPAT,'UTF-8',false) : '');
$arr['created'] = datetime_convert('UTC','UTC',$x['created']);
if((! array_key_exists('expires',$x)) || ($x['expires'] === NULL_DATE))
$arr['expires'] = NULL_DATE;
@ -1656,6 +1658,7 @@ function get_mail_elements($x) {
if($arr['created'] > datetime_convert())
$arr['created'] = datetime_convert();
$arr['mid'] = (($x['message_id']) ? htmlspecialchars($x['message_id'], ENT_COMPAT,'UTF-8',false) : '');
$arr['parent_mid'] = (($x['message_parent']) ? htmlspecialchars($x['message_parent'], ENT_COMPAT,'UTF-8',false) : '');
@ -3536,6 +3539,7 @@ function mail_store($arr) {
$arr['title'] = ((x($arr,'title')) ? trim($arr['title']) : '');
$arr['parent_mid'] = ((x($arr,'parent_mid')) ? notags(trim($arr['parent_mid'])) : '');
$arr['body'] = ((x($arr,'body')) ? trim($arr['body']) : '');
$arr['conv_guid'] = ((x($arr,'conv_guid')) ? trim($arr['conv_guid']) : '');
$arr['mail_flags'] = ((x($arr,'mail_flags')) ? intval($arr['mail_flags']) : 0 );

View File

@ -28,8 +28,6 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto='
// $expires = datetime_convert(date_default_timezone_get(),'UTC',$expires);
if($uid) {
$r = q("select * from channel where channel_id = %d limit 1",
intval($uid)
@ -52,17 +50,17 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto='
$conv_guid = '';
if(strlen($replyto)) {
$r = q("select convid from mail where channel_id = %d and ( mid = '%s' or parent_mid = '%s' ) limit 1",
$r = q("select conv_guid from mail where channel_id = %d and ( mid = '%s' or parent_mid = '%s' ) limit 1",
intval(local_channel()),
dbesc($replyto),
dbesc($replyto)
);
if($r) {
$convid = $r[0]['convid'];
$conv_guid = $r[0]['conv_guid'];
}
}
if(! $convid) {
if(! $conv_guid) {
// create a new conversation
@ -91,33 +89,30 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto='
dbesc($handles)
);
$r = q("select * from conv where guid = '%s' and uid = %d limit 1",
dbesc($conv_guid),
intval(local_channel())
);
if($r) {
$convid = $r[0]['id'];
$retconv = $r[0];
}
}
if(! $convid) {
$ret['message'] = 'conversation not found';
return $ret;
}
if(! $conv_guid) {
$r = q("select * from conv where id = %d and uid = %d limit 1",
intval($convid),
if(! $retconv) {
$r = q("select * from conv where guid = '%s' and uid = %d limit 1",
dbesc($conv_guid),
intval(local_channel())
);
if($r) {
$conv_guid = $r[0]['guid'];
$retconv = $r[0];
}
}
if(! $retconv) {
$ret['message'] = 'conversation not found';
return $ret;
}
// generate a unique message_id
do {
@ -189,10 +184,10 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto='
$r = q("INSERT INTO mail ( account_id, convid, mail_obscured, channel_id, from_xchan, to_xchan, title, body, attach, mid, parent_mid, created, expires )
VALUES ( %d, %d, %d, %d, '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s' )",
$r = q("INSERT INTO mail ( account_id, conv_guid, mail_obscured, channel_id, from_xchan, to_xchan, title, body, attach, mid, parent_mid, created, expires )
VALUES ( %d, '%s', %d, %d, '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s' )",
intval($channel['channel_account_id']),
intval($convid),
dbesc($conv_guid),
intval(1),
intval($channel['channel_id']),
dbesc($channel['channel_hash']),
@ -215,7 +210,6 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto='
if($r) {
$post_id = $r[0]['id'];
$retmail = $r;
$retmail['conv_guid'] = $conv_guid;
}
else {
$ret['message'] = t('Stored post could not be verified.');
@ -260,10 +254,9 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto='
$ret['success'] = true;
$ret['message_item'] = intval($post_id);
if($retconv)
$ret['conv'] = $retconv;
if($retmail)
$ret['mail'] = $retmail;
$ret['conv'] = $retconv;
$ret['mail'] = $retmail;
return $ret;
}
@ -391,7 +384,7 @@ function private_messages_drop($channel_id, $messageitem_id, $drop_conversation
if($drop_conversation) {
// find the parent_id
$p = q("SELECT parent_mid, convid FROM mail WHERE id = %d AND channel_id = %d LIMIT 1",
$p = q("SELECT parent_mid, conv_guid FROM mail WHERE id = %d AND channel_id = %d LIMIT 1",
intval($messageitem_id),
intval($channel_id)
);

View File

@ -1677,13 +1677,40 @@ function format_and_send_email($sender,$xchan,$item) {
'additionalMailHeader' => '',
));
}
function do_delivery($deliveries) {
if(! (is_array($deliveries) && count($deliveries)))
return;
$interval = ((get_config('system','delivery_interval') !== false)
? intval(get_config('system','delivery_interval')) : 2 );
$deliveries_per_process = intval(get_config('system','delivery_batch_count'));
if($deliveries_per_process <= 0)
$deliveries_per_process = 1;
$deliver = array();
foreach($deliveries as $d) {
$deliver[] = $d;
if(count($deliver) >= $deliveries_per_process) {
proc_run('php','include/deliver.php',$deliver);
$deliver = array();
if($interval)
@time_sleep_until(microtime(true) + (float) $interval);
}
}
// catch any stragglers
if($deliver)
proc_run('php','include/deliver.php',$deliver);
}

View File

@ -96,6 +96,18 @@ function notifier_run($argv, $argc){
require_once('include/identity.php');
$sys = get_sys_channel();
$deliveries = array();
$dead_hubs = array();
$dh = q("select site_url from site where site_dead = 1");
if(dh) {
foreach($dh as $dead) {
$dead_hubs[] = $dead['site_url'];
}
}
if($cmd == 'permission_update') {
// Get the recipient
$r = q("select abook.*, hubloc.* from abook
@ -113,8 +125,11 @@ function notifier_run($argv, $argc){
intval($r[0]['abook_channel'])
);
if($s) {
$perm_update = array('sender' => $s[0], 'recipient' => $r[0], 'success' => false);
$perm_update = array('sender' => $s[0], 'recipient' => $r[0], 'success' => false, 'deliveries' => '');
call_hooks('permissions_update',$perm_update);
if($perm_update['success'] && $perm_update['deliveries'])
$deliveries[] = $perm_update['deliveries'];
if(! $perm_update['success']) {
// send a refresh message to each hub they have registered here
$h = q("select * from hubloc where hubloc_hash = '%s'
@ -125,36 +140,40 @@ function notifier_run($argv, $argc){
);
if($h) {
foreach($h as $hh) {
if(in_array($hh['hubloc_url'],$dead_hubs)) {
logger('skipping dead hub: ' . $hh['hubloc_url'], LOGGER_DEBUG);
continue;
}
$data = zot_build_packet($s[0],'refresh',array(array(
'guid' => $hh['hubloc_guid'],
'guid_sig' => $hh['hubloc_guid_sig'],
'url' => $hh['hubloc_url'])
));
if($data) {
$result = zot_zot($hh['hubloc_callback'],$data);
// if immediate delivery failed, stick it in the queue to try again later.
if(! $result['success']) {
$hash = random_string();
q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg )
values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )",
dbesc($hash),
intval($s[0]['channel_account_id']),
intval($s[0]['channel_id']),
dbesc('zot'),
dbesc($hh['hubloc_callback']),
intval(1),
dbesc(datetime_convert()),
dbesc(datetime_convert()),
dbesc($data),
dbesc('')
);
}
$hash = random_string();
q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg )
values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )",
dbesc($hash),
intval($s[0]['channel_account_id']),
intval($s[0]['channel_id']),
dbesc('zot'),
dbesc($hh['hubloc_callback']),
intval(1),
dbesc(datetime_convert()),
dbesc(datetime_convert()),
dbesc($data),
dbesc('')
);
$deliveries[] = $hash;
}
}
}
}
}
if($deliveries)
do_delivery($deliveries);
}
}
return;
@ -524,14 +543,6 @@ function notifier_run($argv, $argc){
$hubs = $r;
$dead_hubs = array();
$dh = q("select site_url from site where site_dead = 1");
if(dh) {
foreach($dh as $dead) {
$dead_hubs[] = $dead['site_url'];
}
}
/**
@ -571,15 +582,6 @@ function notifier_run($argv, $argc){
logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG);
$interval = ((get_config('system','delivery_interval') !== false)
? intval(get_config('system','delivery_interval')) : 2 );
$deliveries_per_process = intval(get_config('system','delivery_batch_count'));
if($deliveries_per_process <= 0)
$deliveries_per_process = 1;
$deliveries = array();
foreach($dhubs as $hub) {
@ -690,26 +692,8 @@ function notifier_run($argv, $argc){
proc_run('php','include/deliver_hooks.php', $target_item['id']);
}
if($deliveries) {
$deliver = array();
foreach($deliveries as $d) {
$deliver[] = $d;
if(count($deliver) >= $deliveries_per_process) {
proc_run('php','include/deliver.php',$deliver);
$deliver = array();
if($interval)
@time_sleep_until(microtime(true) + (float) $interval);
}
}
}
// catch any stragglers
if($deliver)
proc_run('php','include/deliver.php',$deliver);
if($deliveries)
do_delivery($deliveries);
logger('notifier: basic loop complete.', LOGGER_DEBUG);

View File

@ -731,6 +731,7 @@ CREATE TABLE IF NOT EXISTS `likes` (
CREATE TABLE IF NOT EXISTS `mail` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`convid` int(10) unsigned NOT NULL DEFAULT '0',
`conv_guid` char(255) NOT NULL DEFAULT '',
`mail_flags` int(10) unsigned NOT NULL DEFAULT '0',
`from_xchan` char(255) NOT NULL DEFAULT '',
`to_xchan` char(255) NOT NULL DEFAULT '',
@ -761,6 +762,7 @@ CREATE TABLE IF NOT EXISTS `mail` (
KEY `parent_mid` (`parent_mid`),
KEY `expires` (`expires`),
KEY `convid` (`convid`),
KEY `conv_guid` (`conv_guid`),
KEY `mail_deleted` (`mail_deleted`),
KEY `mail_replied` (`mail_replied`),
KEY `mail_isreply` (`mail_isreply`),

View File

@ -728,6 +728,7 @@ create index "likes_target_id" on likes ("target_id");
CREATE TABLE "mail" (
"id" serial NOT NULL,
"convid" bigint NOT NULL DEFAULT '0',
"conv_guid" text NOT NULL,
"mail_flags" bigint NOT NULL DEFAULT '0',
"from_xchan" text NOT NULL DEFAULT '',
"to_xchan" text NOT NULL DEFAULT '',
@ -750,6 +751,7 @@ CREATE TABLE "mail" (
PRIMARY KEY ("id")
);
create index "mail_convid" on mail ("convid");
create index "mail_conv_guid" on mail ("conv_guid");
create index "mail_created" on mail ("created");
create index "mail_flags" on mail ("mail_flags");
create index "mail_account_id" on mail ("account_id");

View File

@ -1887,6 +1887,23 @@ function update_r1155() {
function update_r1156() {
return UPDATE_SUCCESS;
$r1 = q("ALTER TABLE mail ADD conv_guid CHAR( 255 ) NOT NULL DEFAULT '' ");
$r2 = q("create index conv_guid on mail ( conv_guid ) ");
$r3 = q("select mail.id, mail.convid, conv.guid from mail left join conv on mail.convid = conv.id where true");
if($r3) {
foreach($r3 as $rr) {
if($rr['convid']) {
q("update mail set conv_guid = '%s' where id = %d",
dbesc($rr['guid']),
intval($rr['id'])
);
}
}
}
if($r1 && $r2)
return UPDATE_SUCCESS;
return UPDATE_FAILED;
}