Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
a0f7453c6e | ||
|
46a1e16f21 | ||
|
f3f7a3840a | ||
|
7539254e96 | ||
|
456478c4e1 |
3
Gemfile
3
Gemfile
@@ -38,7 +38,7 @@ gem 'link_header'
|
||||
gem 'local_time'
|
||||
gem 'nokogiri'
|
||||
gem 'oj'
|
||||
gem 'ostatus2', '~> 1.1'
|
||||
gem 'ostatus2', '~> 2.0'
|
||||
gem 'ox'
|
||||
gem 'rabl'
|
||||
gem 'rack-attack'
|
||||
@@ -51,6 +51,7 @@ gem 'rqrcode'
|
||||
gem 'ruby-oembed', require: 'oembed'
|
||||
gem 'sanitize'
|
||||
gem 'sidekiq'
|
||||
gem 'sidekiq-scheduler'
|
||||
gem 'sidekiq-unique-jobs'
|
||||
gem 'simple-navigation'
|
||||
gem 'simple_form'
|
||||
|
14
Gemfile.lock
14
Gemfile.lock
@@ -153,6 +153,8 @@ GEM
|
||||
thread_safe
|
||||
encryptor (3.0.0)
|
||||
erubis (2.7.0)
|
||||
et-orbi (1.0.3)
|
||||
tzinfo
|
||||
execjs (2.7.0)
|
||||
fabrication (2.16.1)
|
||||
faker (1.7.3)
|
||||
@@ -265,7 +267,7 @@ GEM
|
||||
oj (3.0.2)
|
||||
openssl (2.0.3)
|
||||
orm_adapter (0.5.0)
|
||||
ostatus2 (1.1.0)
|
||||
ostatus2 (2.0.0)
|
||||
addressable (~> 2.4)
|
||||
http (~> 2.0)
|
||||
nokogiri (~> 1.6)
|
||||
@@ -402,6 +404,8 @@ GEM
|
||||
unicode-display_width (~> 1.0, >= 1.0.1)
|
||||
ruby-oembed (0.12.0)
|
||||
ruby-progressbar (1.8.1)
|
||||
rufus-scheduler (3.4.0)
|
||||
et-orbi (~> 1.0)
|
||||
safe_yaml (1.0.4)
|
||||
sanitize (4.4.0)
|
||||
crass (~> 1.0.2)
|
||||
@@ -419,6 +423,11 @@ GEM
|
||||
connection_pool (~> 2.2, >= 2.2.0)
|
||||
rack-protection (>= 1.5.0)
|
||||
redis (~> 3.2, >= 3.2.1)
|
||||
sidekiq-scheduler (2.1.4)
|
||||
redis (~> 3)
|
||||
rufus-scheduler (~> 3.2)
|
||||
sidekiq (>= 3)
|
||||
tilt (>= 1.4.0)
|
||||
sidekiq-unique-jobs (5.0.0)
|
||||
sidekiq (>= 4.0)
|
||||
thor
|
||||
@@ -523,7 +532,7 @@ DEPENDENCIES
|
||||
microformats2
|
||||
nokogiri
|
||||
oj
|
||||
ostatus2 (~> 1.1)
|
||||
ostatus2 (~> 2.0)
|
||||
ox
|
||||
paperclip (~> 5.1)
|
||||
paperclip-av-transcoder
|
||||
@@ -552,6 +561,7 @@ DEPENDENCIES
|
||||
sanitize
|
||||
sass-rails (~> 5.0)
|
||||
sidekiq
|
||||
sidekiq-scheduler
|
||||
sidekiq-unique-jobs
|
||||
simple-navigation
|
||||
simple_form
|
||||
|
2
Procfile
2
Procfile
@@ -1,2 +1,2 @@
|
||||
web: bundle exec puma -C config/puma.rb
|
||||
worker: bundle exec sidekiq -q default -q push -q pull -q mailers
|
||||
worker: bundle exec sidekiq
|
||||
|
@@ -5,13 +5,13 @@ class Api::SalmonController < ApiController
|
||||
respond_to :txt
|
||||
|
||||
def update
|
||||
body = request.body.read
|
||||
payload = request.body.read
|
||||
|
||||
if body.nil?
|
||||
head 200
|
||||
else
|
||||
SalmonWorker.perform_async(@account.id, body.force_encoding('UTF-8'))
|
||||
if !payload.nil? && verify?(payload)
|
||||
SalmonWorker.perform_async(@account.id, payload.force_encoding('UTF-8'))
|
||||
head 201
|
||||
else
|
||||
head 202
|
||||
end
|
||||
end
|
||||
|
||||
@@ -20,4 +20,8 @@ class Api::SalmonController < ApiController
|
||||
def set_account
|
||||
@account = Account.find(params[:id])
|
||||
end
|
||||
|
||||
def verify?(payload)
|
||||
VerifySalmonService.new.call(payload)
|
||||
end
|
||||
end
|
||||
|
@@ -19,10 +19,9 @@ class Api::SubscriptionsController < ApiController
|
||||
|
||||
if subscription.verify(body, request.headers['HTTP_X_HUB_SIGNATURE'])
|
||||
ProcessingWorker.perform_async(@account.id, body.force_encoding('UTF-8'))
|
||||
head 201
|
||||
else
|
||||
head 202
|
||||
end
|
||||
|
||||
head 200
|
||||
end
|
||||
|
||||
private
|
||||
|
@@ -1,13 +1,17 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HttpHelper
|
||||
USER_AGENT = "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)"
|
||||
|
||||
def http_client(options = {})
|
||||
timeout = { write: 10, connect: 10, read: 10 }.merge(options)
|
||||
|
||||
HTTP.headers(user_agent: USER_AGENT)
|
||||
HTTP.headers(user_agent: user_agent)
|
||||
.timeout(:per_operation, timeout)
|
||||
.follow
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def user_agent
|
||||
@user_agent ||= "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)"
|
||||
end
|
||||
end
|
||||
|
@@ -65,8 +65,10 @@ class TagManager
|
||||
end
|
||||
|
||||
def normalize_domain(domain)
|
||||
return if domain.nil?
|
||||
|
||||
uri = Addressable::URI.new
|
||||
uri.host = domain
|
||||
uri.host = domain.gsub(/[\/]/, '')
|
||||
uri.normalize.host
|
||||
end
|
||||
|
||||
|
@@ -65,9 +65,10 @@ class Account < ApplicationRecord
|
||||
|
||||
scope :remote, -> { where.not(domain: nil) }
|
||||
scope :local, -> { where(domain: nil) }
|
||||
scope :without_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) = 0') }
|
||||
scope :with_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) > 0') }
|
||||
scope :without_followers, -> { where(followers_count: 0) }
|
||||
scope :with_followers, -> { where('followers_count > 0') }
|
||||
scope :expiring, ->(time) { where(subscription_expires_at: nil).or(where('subscription_expires_at < ?', time)).remote.with_followers }
|
||||
scope :partitioned, -> { order('row_number() over (partition by domain)') }
|
||||
scope :silenced, -> { where(silenced: true) }
|
||||
scope :suspended, -> { where(suspended: true) }
|
||||
scope :recent, -> { reorder(id: :desc) }
|
||||
|
21
app/services/concerns/author_extractor.rb
Normal file
21
app/services/concerns/author_extractor.rb
Normal file
@@ -0,0 +1,21 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module AuthorExtractor
|
||||
def author_from_xml(xml)
|
||||
# Try <email> for acct
|
||||
acct = xml.at_xpath('./xmlns:author/xmlns:email', xmlns: TagManager::XMLNS)&.content
|
||||
|
||||
# Try <name> + <uri>
|
||||
if acct.blank?
|
||||
username = xml.at_xpath('./xmlns:author/xmlns:name', xmlns: TagManager::XMLNS)&.content
|
||||
uri = xml.at_xpath('./xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS)&.content
|
||||
|
||||
return nil if username.blank? || uri.blank?
|
||||
|
||||
domain = Addressable::URI.parse(uri).normalize.host
|
||||
acct = "#{username}@#{domain}"
|
||||
end
|
||||
|
||||
FollowRemoteAccountService.new.call(acct)
|
||||
end
|
||||
end
|
@@ -1,6 +1,8 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class FetchRemoteAccountService < BaseService
|
||||
include AuthorExtractor
|
||||
|
||||
def call(url, prefetched_body = nil)
|
||||
if prefetched_body.nil?
|
||||
atom_url, body = FetchAtomService.new.call(url)
|
||||
@@ -19,21 +21,10 @@ class FetchRemoteAccountService < BaseService
|
||||
xml = Nokogiri::XML(body)
|
||||
xml.encoding = 'utf-8'
|
||||
|
||||
email = xml.at_xpath('//xmlns:author/xmlns:email').try(:content)
|
||||
if email.nil?
|
||||
url_parts = Addressable::URI.parse(url).normalize
|
||||
username = xml.at_xpath('//xmlns:author/xmlns:name').try(:content)
|
||||
domain = url_parts.host
|
||||
else
|
||||
username, domain = email.split('@')
|
||||
end
|
||||
account = author_from_xml(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS))
|
||||
|
||||
return nil if username.nil? || domain.nil?
|
||||
|
||||
Rails.logger.debug "Going to webfinger #{username}@#{domain}"
|
||||
|
||||
account = FollowRemoteAccountService.new.call("#{username}@#{domain}")
|
||||
UpdateRemoteProfileService.new.call(xml, account) unless account.nil?
|
||||
|
||||
account
|
||||
rescue TypeError
|
||||
Rails.logger.debug "Unparseable URL given: #{url}"
|
||||
|
@@ -1,6 +1,8 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class FetchRemoteStatusService < BaseService
|
||||
include AuthorExtractor
|
||||
|
||||
def call(url, prefetched_body = nil)
|
||||
if prefetched_body.nil?
|
||||
atom_url, body = FetchAtomService.new.call(url)
|
||||
@@ -21,37 +23,19 @@ class FetchRemoteStatusService < BaseService
|
||||
xml = Nokogiri::XML(body)
|
||||
xml.encoding = 'utf-8'
|
||||
|
||||
account = extract_author(url, xml)
|
||||
account = author_from_xml(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS))
|
||||
domain = Addressable::URI.parse(url).normalize.host
|
||||
|
||||
return nil if account.nil?
|
||||
return nil unless !account.nil? && confirmed_domain?(domain, account)
|
||||
|
||||
statuses = ProcessFeedService.new.call(body, account)
|
||||
|
||||
statuses.first
|
||||
end
|
||||
|
||||
def extract_author(url, xml)
|
||||
url_parts = Addressable::URI.parse(url).normalize
|
||||
username = xml.at_xpath('//xmlns:author/xmlns:name').try(:content)
|
||||
domain = url_parts.host
|
||||
|
||||
return nil if username.nil?
|
||||
|
||||
Rails.logger.debug "Going to webfinger #{username}@#{domain}"
|
||||
|
||||
account = FollowRemoteAccountService.new.call("#{username}@#{domain}")
|
||||
|
||||
# If the author's confirmed URLs do not match the domain of the URL
|
||||
# we are reading this from, abort
|
||||
return nil unless confirmed_domain?(domain, account)
|
||||
|
||||
account
|
||||
rescue Nokogiri::XML::XPath::SyntaxError
|
||||
Rails.logger.debug 'Invalid XML or missing namespace'
|
||||
nil
|
||||
end
|
||||
|
||||
def confirmed_domain?(domain, account)
|
||||
domain.casecmp(account.domain).zero? || domain.casecmp(Addressable::URI.parse(account.remote_url).normalize.host).zero?
|
||||
account.domain.nil? || domain.casecmp(account.domain).zero? || domain.casecmp(Addressable::URI.parse(account.remote_url).normalize.host).zero?
|
||||
end
|
||||
end
|
||||
|
@@ -40,7 +40,7 @@ class FollowService < BaseService
|
||||
if target_account.local?
|
||||
NotifyService.new.call(target_account, follow)
|
||||
else
|
||||
SubscribeService.new.call(target_account) unless target_account.subscribed?
|
||||
Pubsubhubbub::SubscribeWorker.perform_async(target_account.id) unless target_account.subscribed?
|
||||
NotificationWorker.perform_async(build_follow_xml(follow), source_account.id, target_account.id)
|
||||
AfterRemoteFollowWorker.perform_async(follow.id)
|
||||
end
|
||||
|
@@ -20,6 +20,8 @@ class ProcessFeedService < BaseService
|
||||
end
|
||||
|
||||
class ProcessEntry
|
||||
include AuthorExtractor
|
||||
|
||||
def call(xml, account)
|
||||
@account = account
|
||||
@xml = xml
|
||||
@@ -40,6 +42,11 @@ class ProcessFeedService < BaseService
|
||||
private
|
||||
|
||||
def create_status
|
||||
if redis.exists("delete_upon_arrival:#{id}")
|
||||
Rails.logger.debug "Delete for status #{id} was queued, ignoring"
|
||||
return
|
||||
end
|
||||
|
||||
Rails.logger.debug "Creating remote status #{id}"
|
||||
status, just_created = status_from_xml(@xml)
|
||||
|
||||
@@ -82,7 +89,13 @@ class ProcessFeedService < BaseService
|
||||
def delete_status
|
||||
Rails.logger.debug "Deleting remote status #{id}"
|
||||
status = Status.find_by(uri: id)
|
||||
RemoveStatusService.new.call(status) unless status.nil?
|
||||
|
||||
if status.nil?
|
||||
redis.setex("delete_upon_arrival:#{id}", 6 * 3_600, id)
|
||||
else
|
||||
RemoveStatusService.new.call(status)
|
||||
end
|
||||
|
||||
nil
|
||||
end
|
||||
|
||||
@@ -108,7 +121,7 @@ class ProcessFeedService < BaseService
|
||||
# If that author cannot be found, don't record the status (do not misattribute)
|
||||
if account?(entry)
|
||||
begin
|
||||
account = find_or_resolve_account(acct(entry))
|
||||
account = author_from_xml(entry)
|
||||
return [nil, false] if account.nil?
|
||||
rescue Goldfinger::Error
|
||||
return [nil, false]
|
||||
@@ -143,10 +156,6 @@ class ProcessFeedService < BaseService
|
||||
[status, true]
|
||||
end
|
||||
|
||||
def find_or_resolve_account(acct)
|
||||
FollowRemoteAccountService.new.call(acct)
|
||||
end
|
||||
|
||||
def find_or_resolve_status(parent, uri, url)
|
||||
status = find_status(uri)
|
||||
|
||||
@@ -276,12 +285,8 @@ class ProcessFeedService < BaseService
|
||||
!xml.at_xpath('./xmlns:author', xmlns: TagManager::XMLNS).nil?
|
||||
end
|
||||
|
||||
def acct(xml = @xml)
|
||||
username = xml.at_xpath('./xmlns:author/xmlns:name', xmlns: TagManager::XMLNS).content
|
||||
url = xml.at_xpath('./xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS).content
|
||||
domain = Addressable::URI.parse(url).normalize.host
|
||||
|
||||
"#{username}@#{domain}"
|
||||
def redis
|
||||
Redis.current
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@@ -1,6 +1,8 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ProcessInteractionService < BaseService
|
||||
include AuthorExtractor
|
||||
|
||||
# Record locally the remote interaction with our user
|
||||
# @param [String] envelope Salmon envelope
|
||||
# @param [Account] target_account Account the Salmon was addressed to
|
||||
@@ -10,18 +12,9 @@ class ProcessInteractionService < BaseService
|
||||
xml = Nokogiri::XML(body)
|
||||
xml.encoding = 'utf-8'
|
||||
|
||||
return unless contains_author?(xml)
|
||||
account = author_from_xml(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS))
|
||||
|
||||
username = xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:name', xmlns: TagManager::XMLNS).content
|
||||
url = xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS).content
|
||||
domain = Addressable::URI.parse(url).normalize.host
|
||||
account = Account.find_by(username: username, domain: domain)
|
||||
|
||||
if account.nil?
|
||||
account = follow_remote_account_service.call("#{username}@#{domain}")
|
||||
end
|
||||
|
||||
return if account.suspended?
|
||||
return if account.nil? || account.suspended?
|
||||
|
||||
if salmon.verify(envelope, account.keypair)
|
||||
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true)
|
||||
@@ -59,10 +52,6 @@ class ProcessInteractionService < BaseService
|
||||
|
||||
private
|
||||
|
||||
def contains_author?(xml)
|
||||
!(xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:name', xmlns: TagManager::XMLNS).nil? || xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS).nil?)
|
||||
end
|
||||
|
||||
def mentions_account?(xml, account)
|
||||
xml.xpath('/xmlns:entry/xmlns:link[@rel="mentioned"]', xmlns: TagManager::XMLNS).each { |mention_link| return true if [TagManager.instance.uri_for(account), TagManager.instance.url_for(account)].include?(mention_link.attribute('href').value) }
|
||||
false
|
||||
@@ -88,7 +77,7 @@ class ProcessInteractionService < BaseService
|
||||
def authorize_follow_request!(account, target_account)
|
||||
follow_request = FollowRequest.find_by(account: target_account, target_account: account)
|
||||
follow_request&.authorize!
|
||||
SubscribeService.new.call(account) unless account.subscribed?
|
||||
Pubsubhubbub::SubscribeWorker.perform_async(account.id) unless account.subscribed?
|
||||
end
|
||||
|
||||
def reject_follow_request!(account, target_account)
|
||||
@@ -144,16 +133,4 @@ class ProcessInteractionService < BaseService
|
||||
def salmon
|
||||
@salmon ||= OStatus2::Salmon.new
|
||||
end
|
||||
|
||||
def follow_remote_account_service
|
||||
@follow_remote_account_service ||= FollowRemoteAccountService.new
|
||||
end
|
||||
|
||||
def process_feed_service
|
||||
@process_feed_service ||= ProcessFeedService.new
|
||||
end
|
||||
|
||||
def remove_status_service
|
||||
@remove_status_service ||= RemoveStatusService.new
|
||||
end
|
||||
end
|
||||
|
@@ -5,15 +5,31 @@ class SubscribeService < BaseService
|
||||
account.secret = SecureRandom.hex
|
||||
|
||||
subscription = account.subscription(api_subscription_url(account.id))
|
||||
response = subscription.subscribe
|
||||
response = subscription.subscribe
|
||||
|
||||
unless response.successful?
|
||||
if response_failed_permanently?(response)
|
||||
# An error in the 4xx range (except for 429, which is rate limiting)
|
||||
# means we're not allowed to subscribe. Fail and move on
|
||||
account.secret = ''
|
||||
Rails.logger.debug "PuSH subscription request for #{account.acct} failed: #{response.message}"
|
||||
account.save!
|
||||
elsif response_successful?(response)
|
||||
# Anything in the 2xx range means the subscription will be confirmed
|
||||
# asynchronously, we've done what we needed to do
|
||||
account.save!
|
||||
else
|
||||
# What's left is the 5xx range and 429, which means we need to retry
|
||||
# at a later time. Fail loudly!
|
||||
raise "Subscription attempt failed for #{account.acct} (#{account.hub_url}): HTTP #{response.code}"
|
||||
end
|
||||
end
|
||||
|
||||
account.save!
|
||||
rescue HTTP::Error, OpenSSL::SSL::SSLError
|
||||
Rails.logger.debug "PuSH subscription request for #{account.acct} could not be made due to HTTP or SSL error"
|
||||
private
|
||||
|
||||
def response_failed_permanently?(response)
|
||||
response.code > 299 && response.code < 500 && response.code != 429
|
||||
end
|
||||
|
||||
def response_successful?(response)
|
||||
response.code > 199 && response.code < 300
|
||||
end
|
||||
end
|
||||
|
@@ -28,7 +28,7 @@ class UpdateRemoteProfileService < BaseService
|
||||
|
||||
account.save_with_optional_avatar!
|
||||
|
||||
SubscribeService.new.call(account) if resubscribe && (account.hub_url != old_hub_url)
|
||||
Pubsubhubbub::SubscribeWorker.perform_async(account.id) if resubscribe && (account.hub_url != old_hub_url)
|
||||
end
|
||||
|
||||
private
|
||||
|
26
app/services/verify_salmon_service.rb
Normal file
26
app/services/verify_salmon_service.rb
Normal file
@@ -0,0 +1,26 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class VerifySalmonService < BaseService
|
||||
include AuthorExtractor
|
||||
|
||||
def call(payload)
|
||||
body = salmon.unpack(payload)
|
||||
|
||||
xml = Nokogiri::XML(body)
|
||||
xml.encoding = 'utf-8'
|
||||
|
||||
account = author_from_xml(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS))
|
||||
|
||||
if account.nil?
|
||||
false
|
||||
else
|
||||
salmon.verify(payload, account.keypair)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def salmon
|
||||
@salmon ||= OStatus2::Salmon.new
|
||||
end
|
||||
end
|
@@ -1,7 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ApplicationWorker
|
||||
def info(message)
|
||||
Rails.logger.info("#{self.class.name} - #{message}")
|
||||
end
|
||||
end
|
@@ -1,11 +1,11 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class DistributionWorker < ApplicationWorker
|
||||
class DistributionWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform(status_id)
|
||||
FanOutOnWriteService.new.call(Status.find(status_id))
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
info("Couldn't find the status")
|
||||
true
|
||||
end
|
||||
end
|
||||
|
@@ -25,7 +25,7 @@ class Pubsubhubbub::ConfirmationWorker
|
||||
|
||||
body = response.body.to_s
|
||||
|
||||
Rails.logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{body}"
|
||||
logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{body}"
|
||||
|
||||
if mode == 'subscribe' && body == challenge
|
||||
subscription.save!
|
||||
|
@@ -25,8 +25,8 @@ class Pubsubhubbub::DeliveryWorker
|
||||
.headers(headers)
|
||||
.post(subscription.callback_url, body: payload)
|
||||
|
||||
return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling)
|
||||
raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300
|
||||
return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling)
|
||||
raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response)
|
||||
|
||||
subscription.touch(:last_successful_delivery_at)
|
||||
end
|
||||
@@ -37,4 +37,12 @@ class Pubsubhubbub::DeliveryWorker
|
||||
hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload)
|
||||
"sha1=#{hmac}"
|
||||
end
|
||||
|
||||
def response_failed_permanently?(response)
|
||||
response.code > 299 && response.code < 500 && response.code != 429
|
||||
end
|
||||
|
||||
def response_successful?(response)
|
||||
response.code > 199 && response.code < 300
|
||||
end
|
||||
end
|
||||
|
13
app/workers/pubsubhubbub/subscribe_worker.rb
Normal file
13
app/workers/pubsubhubbub/subscribe_worker.rb
Normal file
@@ -0,0 +1,13 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Pubsubhubbub::SubscribeWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(account_id)
|
||||
account = Account.find(account_id)
|
||||
logger.debug "PuSH re-subscribing to #{account.acct}"
|
||||
::SubscribeService.new.call(account)
|
||||
end
|
||||
end
|
20
app/workers/scheduler/subscriptions_scheduler.rb
Normal file
20
app/workers/scheduler/subscriptions_scheduler.rb
Normal file
@@ -0,0 +1,20 @@
|
||||
# frozen_string_literal: true
|
||||
require 'sidekiq-scheduler'
|
||||
|
||||
class Scheduler::SubscriptionsScheduler
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform
|
||||
logger.info 'Queueing PuSH re-subscriptions'
|
||||
|
||||
expiring_accounts.pluck(:id).each do |id|
|
||||
Pubsubhubbub::SubscribeWorker.perform_async(id)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def expiring_accounts
|
||||
Account.expiring(1.day.from_now).partitioned
|
||||
end
|
||||
end
|
@@ -79,7 +79,4 @@ Rails.application.configure do
|
||||
config.react.variant = :development
|
||||
end
|
||||
|
||||
require 'sidekiq/testing'
|
||||
Sidekiq::Testing.inline!
|
||||
|
||||
ActiveRecordQueryTrace.enabled = ENV.fetch('QUERY_TRACE_ENABLED') { false }
|
||||
|
@@ -109,6 +109,7 @@ Rails.application.configure do
|
||||
|
||||
config.to_prepare do
|
||||
StatsD.backend = StatsD::Instrument::Backends::NullBackend.new if ENV['STATSD_ADDR'].blank?
|
||||
Sidekiq::Logging.logger.level = Logger::WARN
|
||||
end
|
||||
|
||||
config.action_dispatch.default_headers = {
|
||||
|
@@ -1,2 +1,11 @@
|
||||
---
|
||||
:concurrency: 5
|
||||
:queues:
|
||||
- default
|
||||
- push
|
||||
- pull
|
||||
- mailers
|
||||
:schedule:
|
||||
subscriptions_scheduler:
|
||||
cron: '0 5 * * *'
|
||||
class: Scheduler::SubscriptionsScheduler
|
||||
|
@@ -13,7 +13,7 @@ module Mastodon
|
||||
end
|
||||
|
||||
def patch
|
||||
2
|
||||
3
|
||||
end
|
||||
|
||||
def pre
|
||||
|
@@ -78,10 +78,8 @@ namespace :mastodon do
|
||||
|
||||
desc 'Re-subscribes to soon expiring PuSH subscriptions'
|
||||
task refresh: :environment do
|
||||
Account.expiring(1.day.from_now).find_each do |a|
|
||||
Rails.logger.debug "PuSH re-subscribing to #{a.acct}"
|
||||
SubscribeService.new.call(a)
|
||||
end
|
||||
# No-op
|
||||
# This task is now executed via sidekiq-scheduler
|
||||
end
|
||||
end
|
||||
|
||||
|
@@ -45,8 +45,8 @@ RSpec.describe Api::SubscriptionsController, type: :controller do
|
||||
post :update, params: { id: account.id }
|
||||
end
|
||||
|
||||
it 'returns http created' do
|
||||
expect(response).to have_http_status(:created)
|
||||
it 'returns http success' do
|
||||
expect(response).to have_http_status(:success)
|
||||
end
|
||||
|
||||
it 'creates statuses for feed' do
|
||||
|
@@ -53,10 +53,11 @@ RSpec.describe FollowService do
|
||||
end
|
||||
|
||||
describe 'unlocked account' do
|
||||
let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com')).account }
|
||||
let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com', hub_url: 'http://hub.example.com')).account }
|
||||
|
||||
before do
|
||||
stub_request(:post, "http://salmon.example.com/").to_return(:status => 200, :body => "", :headers => {})
|
||||
stub_request(:post, "http://hub.example.com/").to_return(status: 202)
|
||||
subject.call(sender, bob.acct)
|
||||
end
|
||||
|
||||
@@ -70,6 +71,10 @@ RSpec.describe FollowService do
|
||||
xml.match(TagManager::VERBS[:follow])
|
||||
}).to have_been_made.once
|
||||
end
|
||||
|
||||
it 'subscribes to PuSH' do
|
||||
expect(a_request(:post, "http://hub.example.com/")).to have_been_made.once
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@@ -154,4 +154,48 @@ XML
|
||||
expect(created_statuses.first.reblog.account_id).to eq good_actor.id
|
||||
expect(created_statuses.first.reblog.text).to eq 'Overwatch rocks'
|
||||
end
|
||||
|
||||
it 'ignores statuses with an out-of-order delete' do
|
||||
sender = Fabricate(:account, username: 'tracer', domain: 'overwatch.com')
|
||||
|
||||
delete_body = <<XML
|
||||
<?xml version="1.0"?>
|
||||
<entry xmlns="http://www.w3.org/2005/Atom" xmlns:thr="http://purl.org/syndication/thread/1.0" xmlns:activity="http://activitystrea.ms/spec/1.0/" xmlns:poco="http://portablecontacts.net/spec/1.0" xmlns:media="http://purl.org/syndication/atommedia" xmlns:ostatus="http://ostatus.org/schema/1.0" xmlns:mastodon="http://mastodon.social/schema/1.0">
|
||||
<id>tag:overwatch.com,2017-04-27:objectId=4487555:objectType=Status</id>
|
||||
<published>2017-04-27T13:49:25Z</published>
|
||||
<updated>2017-04-27T13:49:25Z</updated>
|
||||
<activity:object-type>http://activitystrea.ms/schema/1.0/note</activity:object-type>
|
||||
<activity:verb>http://activitystrea.ms/schema/1.0/delete</activity:verb>
|
||||
<author>
|
||||
<id>https://overwatch.com/users/tracer</id>
|
||||
<activity:object-type>http://activitystrea.ms/schema/1.0/person</activity:object-type>
|
||||
<uri>https://overwatch.com/users/tracer</uri>
|
||||
<name>tracer</name>
|
||||
</author>
|
||||
</entry>
|
||||
XML
|
||||
|
||||
status_body = <<XML
|
||||
<?xml version="1.0"?>
|
||||
<entry xmlns="http://www.w3.org/2005/Atom" xmlns:thr="http://purl.org/syndication/thread/1.0" xmlns:activity="http://activitystrea.ms/spec/1.0/" xmlns:poco="http://portablecontacts.net/spec/1.0" xmlns:media="http://purl.org/syndication/atommedia" xmlns:ostatus="http://ostatus.org/schema/1.0" xmlns:mastodon="http://mastodon.social/schema/1.0">
|
||||
<id>tag:overwatch.com,2017-04-27:objectId=4487555:objectType=Status</id>
|
||||
<published>2017-04-27T13:49:25Z</published>
|
||||
<updated>2017-04-27T13:49:25Z</updated>
|
||||
<activity:object-type>http://activitystrea.ms/schema/1.0/note</activity:object-type>
|
||||
<activity:verb>http://activitystrea.ms/schema/1.0/post</activity:verb>
|
||||
<author>
|
||||
<id>https://overwatch.com/users/tracer</id>
|
||||
<activity:object-type>http://activitystrea.ms/schema/1.0/person</activity:object-type>
|
||||
<uri>https://overwatch.com/users/tracer</uri>
|
||||
<name>tracer</name>
|
||||
</author>
|
||||
<content type="html">Overwatch rocks</content>
|
||||
</entry>
|
||||
XML
|
||||
|
||||
subject.call(delete_body, sender)
|
||||
created_statuses = subject.call(status_body, sender)
|
||||
|
||||
expect(created_statuses).to be_empty
|
||||
end
|
||||
end
|
||||
|
38
spec/services/subscribe_service_spec.rb
Normal file
38
spec/services/subscribe_service_spec.rb
Normal file
@@ -0,0 +1,38 @@
|
||||
require 'rails_helper'
|
||||
|
||||
RSpec.describe SubscribeService do
|
||||
let(:account) { Fabricate(:account, username: 'bob', domain: 'example.com', hub_url: 'http://hub.example.com') }
|
||||
subject { SubscribeService.new }
|
||||
|
||||
it 'sends subscription request to PuSH hub' do
|
||||
stub_request(:post, 'http://hub.example.com/').to_return(status: 202)
|
||||
subject.call(account)
|
||||
expect(a_request(:post, 'http://hub.example.com/')).to have_been_made.once
|
||||
end
|
||||
|
||||
it 'generates and keeps PuSH secret on successful call' do
|
||||
stub_request(:post, 'http://hub.example.com/').to_return(status: 202)
|
||||
subject.call(account)
|
||||
expect(account.secret).to_not be_blank
|
||||
end
|
||||
|
||||
it 'fails silently if PuSH hub forbids subscription' do
|
||||
stub_request(:post, 'http://hub.example.com/').to_return(status: 403)
|
||||
subject.call(account)
|
||||
end
|
||||
|
||||
it 'fails silently if PuSH hub is not found' do
|
||||
stub_request(:post, 'http://hub.example.com/').to_return(status: 404)
|
||||
subject.call(account)
|
||||
end
|
||||
|
||||
it 'fails loudly if there is a network error' do
|
||||
stub_request(:post, 'http://hub.example.com/').to_raise(HTTP::Error)
|
||||
expect { subject.call(account) }.to raise_error HTTP::Error
|
||||
end
|
||||
|
||||
it 'fails loudly if PuSH hub is unavailable' do
|
||||
stub_request(:post, 'http://hub.example.com/').to_return(status: 503)
|
||||
expect { subject.call(account) }.to raise_error
|
||||
end
|
||||
end
|
19
spec/workers/scheduler/subscriptions_scheduler_spec.rb
Normal file
19
spec/workers/scheduler/subscriptions_scheduler_spec.rb
Normal file
@@ -0,0 +1,19 @@
|
||||
require 'rails_helper'
|
||||
|
||||
describe Scheduler::SubscriptionsScheduler do
|
||||
subject { Scheduler::SubscriptionsScheduler.new }
|
||||
|
||||
let!(:expiring_account1) { Fabricate(:account, subscription_expires_at: 20.minutes.from_now, domain: 'example.com', followers_count: 1, hub_url: 'http://hub.example.com') }
|
||||
let!(:expiring_account2) { Fabricate(:account, subscription_expires_at: 4.hours.from_now, domain: 'example.org', followers_count: 1, hub_url: 'http://hub.example.org') }
|
||||
|
||||
before do
|
||||
stub_request(:post, 'http://hub.example.com/').to_return(status: 202)
|
||||
stub_request(:post, 'http://hub.example.org/').to_return(status: 202)
|
||||
end
|
||||
|
||||
it 're-subscribes for all expiring accounts' do
|
||||
subject.perform
|
||||
expect(a_request(:post, 'http://hub.example.com/')).to have_been_made.once
|
||||
expect(a_request(:post, 'http://hub.example.org/')).to have_been_made.once
|
||||
end
|
||||
end
|
Reference in New Issue
Block a user