Compare commits
	
		
			5 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | a0f7453c6e | ||
|  | 46a1e16f21 | ||
|  | f3f7a3840a | ||
|  | 7539254e96 | ||
|  | 456478c4e1 | 
							
								
								
									
										3
									
								
								Gemfile
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								Gemfile
									
									
									
									
									
								
							| @@ -38,7 +38,7 @@ gem 'link_header' | ||||
| gem 'local_time' | ||||
| gem 'nokogiri' | ||||
| gem 'oj' | ||||
| gem 'ostatus2', '~> 1.1' | ||||
| gem 'ostatus2', '~> 2.0' | ||||
| gem 'ox' | ||||
| gem 'rabl' | ||||
| gem 'rack-attack' | ||||
| @@ -51,6 +51,7 @@ gem 'rqrcode' | ||||
| gem 'ruby-oembed', require: 'oembed' | ||||
| gem 'sanitize' | ||||
| gem 'sidekiq' | ||||
| gem 'sidekiq-scheduler' | ||||
| gem 'sidekiq-unique-jobs' | ||||
| gem 'simple-navigation' | ||||
| gem 'simple_form' | ||||
|   | ||||
							
								
								
									
										14
									
								
								Gemfile.lock
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								Gemfile.lock
									
									
									
									
									
								
							| @@ -153,6 +153,8 @@ GEM | ||||
|       thread_safe | ||||
|     encryptor (3.0.0) | ||||
|     erubis (2.7.0) | ||||
|     et-orbi (1.0.3) | ||||
|       tzinfo | ||||
|     execjs (2.7.0) | ||||
|     fabrication (2.16.1) | ||||
|     faker (1.7.3) | ||||
| @@ -265,7 +267,7 @@ GEM | ||||
|     oj (3.0.2) | ||||
|     openssl (2.0.3) | ||||
|     orm_adapter (0.5.0) | ||||
|     ostatus2 (1.1.0) | ||||
|     ostatus2 (2.0.0) | ||||
|       addressable (~> 2.4) | ||||
|       http (~> 2.0) | ||||
|       nokogiri (~> 1.6) | ||||
| @@ -402,6 +404,8 @@ GEM | ||||
|       unicode-display_width (~> 1.0, >= 1.0.1) | ||||
|     ruby-oembed (0.12.0) | ||||
|     ruby-progressbar (1.8.1) | ||||
|     rufus-scheduler (3.4.0) | ||||
|       et-orbi (~> 1.0) | ||||
|     safe_yaml (1.0.4) | ||||
|     sanitize (4.4.0) | ||||
|       crass (~> 1.0.2) | ||||
| @@ -419,6 +423,11 @@ GEM | ||||
|       connection_pool (~> 2.2, >= 2.2.0) | ||||
|       rack-protection (>= 1.5.0) | ||||
|       redis (~> 3.2, >= 3.2.1) | ||||
|     sidekiq-scheduler (2.1.4) | ||||
|       redis (~> 3) | ||||
|       rufus-scheduler (~> 3.2) | ||||
|       sidekiq (>= 3) | ||||
|       tilt (>= 1.4.0) | ||||
|     sidekiq-unique-jobs (5.0.0) | ||||
|       sidekiq (>= 4.0) | ||||
|       thor | ||||
| @@ -523,7 +532,7 @@ DEPENDENCIES | ||||
|   microformats2 | ||||
|   nokogiri | ||||
|   oj | ||||
|   ostatus2 (~> 1.1) | ||||
|   ostatus2 (~> 2.0) | ||||
|   ox | ||||
|   paperclip (~> 5.1) | ||||
|   paperclip-av-transcoder | ||||
| @@ -552,6 +561,7 @@ DEPENDENCIES | ||||
|   sanitize | ||||
|   sass-rails (~> 5.0) | ||||
|   sidekiq | ||||
|   sidekiq-scheduler | ||||
|   sidekiq-unique-jobs | ||||
|   simple-navigation | ||||
|   simple_form | ||||
|   | ||||
							
								
								
									
										2
									
								
								Procfile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Procfile
									
									
									
									
									
								
							| @@ -1,2 +1,2 @@ | ||||
| web: bundle exec puma -C config/puma.rb | ||||
| worker: bundle exec sidekiq -q default -q push -q pull -q mailers | ||||
| worker: bundle exec sidekiq | ||||
|   | ||||
| @@ -5,13 +5,13 @@ class Api::SalmonController < ApiController | ||||
|   respond_to :txt | ||||
|  | ||||
|   def update | ||||
|     body = request.body.read | ||||
|     payload = request.body.read | ||||
|  | ||||
|     if body.nil? | ||||
|       head 200 | ||||
|     else | ||||
|       SalmonWorker.perform_async(@account.id, body.force_encoding('UTF-8')) | ||||
|     if !payload.nil? && verify?(payload) | ||||
|       SalmonWorker.perform_async(@account.id, payload.force_encoding('UTF-8')) | ||||
|       head 201 | ||||
|     else | ||||
|       head 202 | ||||
|     end | ||||
|   end | ||||
|  | ||||
| @@ -20,4 +20,8 @@ class Api::SalmonController < ApiController | ||||
|   def set_account | ||||
|     @account = Account.find(params[:id]) | ||||
|   end | ||||
|  | ||||
|   def verify?(payload) | ||||
|     VerifySalmonService.new.call(payload) | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -19,10 +19,9 @@ class Api::SubscriptionsController < ApiController | ||||
|  | ||||
|     if subscription.verify(body, request.headers['HTTP_X_HUB_SIGNATURE']) | ||||
|       ProcessingWorker.perform_async(@account.id, body.force_encoding('UTF-8')) | ||||
|       head 201 | ||||
|     else | ||||
|       head 202 | ||||
|     end | ||||
|  | ||||
|     head 200 | ||||
|   end | ||||
|  | ||||
|   private | ||||
|   | ||||
| @@ -1,13 +1,17 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| module HttpHelper | ||||
|   USER_AGENT = "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)" | ||||
|  | ||||
|   def http_client(options = {}) | ||||
|     timeout = { write: 10, connect: 10, read: 10 }.merge(options) | ||||
|  | ||||
|     HTTP.headers(user_agent: USER_AGENT) | ||||
|     HTTP.headers(user_agent: user_agent) | ||||
|         .timeout(:per_operation, timeout) | ||||
|         .follow | ||||
|   end | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def user_agent | ||||
|     @user_agent ||= "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)" | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -65,8 +65,10 @@ class TagManager | ||||
|   end | ||||
|  | ||||
|   def normalize_domain(domain) | ||||
|     return if domain.nil? | ||||
|  | ||||
|     uri = Addressable::URI.new | ||||
|     uri.host = domain | ||||
|     uri.host = domain.gsub(/[\/]/, '') | ||||
|     uri.normalize.host | ||||
|   end | ||||
|  | ||||
|   | ||||
| @@ -65,9 +65,10 @@ class Account < ApplicationRecord | ||||
|  | ||||
|   scope :remote, -> { where.not(domain: nil) } | ||||
|   scope :local, -> { where(domain: nil) } | ||||
|   scope :without_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) = 0') } | ||||
|   scope :with_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) > 0') } | ||||
|   scope :without_followers, -> { where(followers_count: 0) } | ||||
|   scope :with_followers, -> { where('followers_count > 0') } | ||||
|   scope :expiring, ->(time) { where(subscription_expires_at: nil).or(where('subscription_expires_at < ?', time)).remote.with_followers } | ||||
|   scope :partitioned, -> { order('row_number() over (partition by domain)') } | ||||
|   scope :silenced, -> { where(silenced: true) } | ||||
|   scope :suspended, -> { where(suspended: true) } | ||||
|   scope :recent, -> { reorder(id: :desc) } | ||||
|   | ||||
							
								
								
									
										21
									
								
								app/services/concerns/author_extractor.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								app/services/concerns/author_extractor.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,21 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| module AuthorExtractor | ||||
|   def author_from_xml(xml) | ||||
|     # Try <email> for acct | ||||
|     acct = xml.at_xpath('./xmlns:author/xmlns:email', xmlns: TagManager::XMLNS)&.content | ||||
|  | ||||
|     # Try <name> + <uri> | ||||
|     if acct.blank? | ||||
|       username = xml.at_xpath('./xmlns:author/xmlns:name', xmlns: TagManager::XMLNS)&.content | ||||
|       uri      = xml.at_xpath('./xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS)&.content | ||||
|  | ||||
|       return nil if username.blank? || uri.blank? | ||||
|  | ||||
|       domain = Addressable::URI.parse(uri).normalize.host | ||||
|       acct   = "#{username}@#{domain}" | ||||
|     end | ||||
|  | ||||
|     FollowRemoteAccountService.new.call(acct) | ||||
|   end | ||||
| end | ||||
| @@ -1,6 +1,8 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class FetchRemoteAccountService < BaseService | ||||
|   include AuthorExtractor | ||||
|  | ||||
|   def call(url, prefetched_body = nil) | ||||
|     if prefetched_body.nil? | ||||
|       atom_url, body = FetchAtomService.new.call(url) | ||||
| @@ -19,21 +21,10 @@ class FetchRemoteAccountService < BaseService | ||||
|     xml = Nokogiri::XML(body) | ||||
|     xml.encoding = 'utf-8' | ||||
|  | ||||
|     email = xml.at_xpath('//xmlns:author/xmlns:email').try(:content) | ||||
|     if email.nil? | ||||
|       url_parts = Addressable::URI.parse(url).normalize | ||||
|       username  = xml.at_xpath('//xmlns:author/xmlns:name').try(:content) | ||||
|       domain    = url_parts.host | ||||
|     else | ||||
|       username, domain = email.split('@') | ||||
|     end | ||||
|     account = author_from_xml(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS)) | ||||
|  | ||||
|     return nil if username.nil? || domain.nil? | ||||
|  | ||||
|     Rails.logger.debug "Going to webfinger #{username}@#{domain}" | ||||
|  | ||||
|     account = FollowRemoteAccountService.new.call("#{username}@#{domain}") | ||||
|     UpdateRemoteProfileService.new.call(xml, account) unless account.nil? | ||||
|  | ||||
|     account | ||||
|   rescue TypeError | ||||
|     Rails.logger.debug "Unparseable URL given: #{url}" | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class FetchRemoteStatusService < BaseService | ||||
|   include AuthorExtractor | ||||
|  | ||||
|   def call(url, prefetched_body = nil) | ||||
|     if prefetched_body.nil? | ||||
|       atom_url, body = FetchAtomService.new.call(url) | ||||
| @@ -21,37 +23,19 @@ class FetchRemoteStatusService < BaseService | ||||
|     xml = Nokogiri::XML(body) | ||||
|     xml.encoding = 'utf-8' | ||||
|  | ||||
|     account = extract_author(url, xml) | ||||
|     account = author_from_xml(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS)) | ||||
|     domain  = Addressable::URI.parse(url).normalize.host | ||||
|  | ||||
|     return nil if account.nil? | ||||
|     return nil unless !account.nil? && confirmed_domain?(domain, account) | ||||
|  | ||||
|     statuses = ProcessFeedService.new.call(body, account) | ||||
|  | ||||
|     statuses.first | ||||
|   end | ||||
|  | ||||
|   def extract_author(url, xml) | ||||
|     url_parts = Addressable::URI.parse(url).normalize | ||||
|     username  = xml.at_xpath('//xmlns:author/xmlns:name').try(:content) | ||||
|     domain    = url_parts.host | ||||
|  | ||||
|     return nil if username.nil? | ||||
|  | ||||
|     Rails.logger.debug "Going to webfinger #{username}@#{domain}" | ||||
|  | ||||
|     account = FollowRemoteAccountService.new.call("#{username}@#{domain}") | ||||
|  | ||||
|     # If the author's confirmed URLs do not match the domain of the URL | ||||
|     # we are reading this from, abort | ||||
|     return nil unless confirmed_domain?(domain, account) | ||||
|  | ||||
|     account | ||||
|   rescue Nokogiri::XML::XPath::SyntaxError | ||||
|     Rails.logger.debug 'Invalid XML or missing namespace' | ||||
|     nil | ||||
|   end | ||||
|  | ||||
|   def confirmed_domain?(domain, account) | ||||
|     domain.casecmp(account.domain).zero? || domain.casecmp(Addressable::URI.parse(account.remote_url).normalize.host).zero? | ||||
|     account.domain.nil? || domain.casecmp(account.domain).zero? || domain.casecmp(Addressable::URI.parse(account.remote_url).normalize.host).zero? | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -40,7 +40,7 @@ class FollowService < BaseService | ||||
|     if target_account.local? | ||||
|       NotifyService.new.call(target_account, follow) | ||||
|     else | ||||
|       SubscribeService.new.call(target_account) unless target_account.subscribed? | ||||
|       Pubsubhubbub::SubscribeWorker.perform_async(target_account.id) unless target_account.subscribed? | ||||
|       NotificationWorker.perform_async(build_follow_xml(follow), source_account.id, target_account.id) | ||||
|       AfterRemoteFollowWorker.perform_async(follow.id) | ||||
|     end | ||||
|   | ||||
| @@ -20,6 +20,8 @@ class ProcessFeedService < BaseService | ||||
|   end | ||||
|  | ||||
|   class ProcessEntry | ||||
|     include AuthorExtractor | ||||
|  | ||||
|     def call(xml, account) | ||||
|       @account = account | ||||
|       @xml     = xml | ||||
| @@ -40,6 +42,11 @@ class ProcessFeedService < BaseService | ||||
|     private | ||||
|  | ||||
|     def create_status | ||||
|       if redis.exists("delete_upon_arrival:#{id}") | ||||
|         Rails.logger.debug "Delete for status #{id} was queued, ignoring" | ||||
|         return | ||||
|       end | ||||
|  | ||||
|       Rails.logger.debug "Creating remote status #{id}" | ||||
|       status, just_created = status_from_xml(@xml) | ||||
|  | ||||
| @@ -82,7 +89,13 @@ class ProcessFeedService < BaseService | ||||
|     def delete_status | ||||
|       Rails.logger.debug "Deleting remote status #{id}" | ||||
|       status = Status.find_by(uri: id) | ||||
|       RemoveStatusService.new.call(status) unless status.nil? | ||||
|  | ||||
|       if status.nil? | ||||
|         redis.setex("delete_upon_arrival:#{id}", 6 * 3_600, id) | ||||
|       else | ||||
|         RemoveStatusService.new.call(status) | ||||
|       end | ||||
|  | ||||
|       nil | ||||
|     end | ||||
|  | ||||
| @@ -108,7 +121,7 @@ class ProcessFeedService < BaseService | ||||
|       # If that author cannot be found, don't record the status (do not misattribute) | ||||
|       if account?(entry) | ||||
|         begin | ||||
|           account = find_or_resolve_account(acct(entry)) | ||||
|           account = author_from_xml(entry) | ||||
|           return [nil, false] if account.nil? | ||||
|         rescue Goldfinger::Error | ||||
|           return [nil, false] | ||||
| @@ -143,10 +156,6 @@ class ProcessFeedService < BaseService | ||||
|       [status, true] | ||||
|     end | ||||
|  | ||||
|     def find_or_resolve_account(acct) | ||||
|       FollowRemoteAccountService.new.call(acct) | ||||
|     end | ||||
|  | ||||
|     def find_or_resolve_status(parent, uri, url) | ||||
|       status = find_status(uri) | ||||
|  | ||||
| @@ -276,12 +285,8 @@ class ProcessFeedService < BaseService | ||||
|       !xml.at_xpath('./xmlns:author', xmlns: TagManager::XMLNS).nil? | ||||
|     end | ||||
|  | ||||
|     def acct(xml = @xml) | ||||
|       username = xml.at_xpath('./xmlns:author/xmlns:name', xmlns: TagManager::XMLNS).content | ||||
|       url      = xml.at_xpath('./xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS).content | ||||
|       domain   = Addressable::URI.parse(url).normalize.host | ||||
|  | ||||
|       "#{username}@#{domain}" | ||||
|     def redis | ||||
|       Redis.current | ||||
|     end | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class ProcessInteractionService < BaseService | ||||
|   include AuthorExtractor | ||||
|  | ||||
|   # Record locally the remote interaction with our user | ||||
|   # @param [String] envelope Salmon envelope | ||||
|   # @param [Account] target_account Account the Salmon was addressed to | ||||
| @@ -10,18 +12,9 @@ class ProcessInteractionService < BaseService | ||||
|     xml = Nokogiri::XML(body) | ||||
|     xml.encoding = 'utf-8' | ||||
|  | ||||
|     return unless contains_author?(xml) | ||||
|     account = author_from_xml(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS)) | ||||
|  | ||||
|     username = xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:name', xmlns: TagManager::XMLNS).content | ||||
|     url      = xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS).content | ||||
|     domain   = Addressable::URI.parse(url).normalize.host | ||||
|     account  = Account.find_by(username: username, domain: domain) | ||||
|  | ||||
|     if account.nil? | ||||
|       account = follow_remote_account_service.call("#{username}@#{domain}") | ||||
|     end | ||||
|  | ||||
|     return if account.suspended? | ||||
|     return if account.nil? || account.suspended? | ||||
|  | ||||
|     if salmon.verify(envelope, account.keypair) | ||||
|       RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true) | ||||
| @@ -59,10 +52,6 @@ class ProcessInteractionService < BaseService | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def contains_author?(xml) | ||||
|     !(xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:name', xmlns: TagManager::XMLNS).nil? || xml.at_xpath('/xmlns:entry/xmlns:author/xmlns:uri', xmlns: TagManager::XMLNS).nil?) | ||||
|   end | ||||
|  | ||||
|   def mentions_account?(xml, account) | ||||
|     xml.xpath('/xmlns:entry/xmlns:link[@rel="mentioned"]', xmlns: TagManager::XMLNS).each { |mention_link| return true if [TagManager.instance.uri_for(account), TagManager.instance.url_for(account)].include?(mention_link.attribute('href').value) } | ||||
|     false | ||||
| @@ -88,7 +77,7 @@ class ProcessInteractionService < BaseService | ||||
|   def authorize_follow_request!(account, target_account) | ||||
|     follow_request = FollowRequest.find_by(account: target_account, target_account: account) | ||||
|     follow_request&.authorize! | ||||
|     SubscribeService.new.call(account) unless account.subscribed? | ||||
|     Pubsubhubbub::SubscribeWorker.perform_async(account.id) unless account.subscribed? | ||||
|   end | ||||
|  | ||||
|   def reject_follow_request!(account, target_account) | ||||
| @@ -144,16 +133,4 @@ class ProcessInteractionService < BaseService | ||||
|   def salmon | ||||
|     @salmon ||= OStatus2::Salmon.new | ||||
|   end | ||||
|  | ||||
|   def follow_remote_account_service | ||||
|     @follow_remote_account_service ||= FollowRemoteAccountService.new | ||||
|   end | ||||
|  | ||||
|   def process_feed_service | ||||
|     @process_feed_service ||= ProcessFeedService.new | ||||
|   end | ||||
|  | ||||
|   def remove_status_service | ||||
|     @remove_status_service ||= RemoveStatusService.new | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -5,15 +5,31 @@ class SubscribeService < BaseService | ||||
|     account.secret = SecureRandom.hex | ||||
|  | ||||
|     subscription = account.subscription(api_subscription_url(account.id)) | ||||
|     response = subscription.subscribe | ||||
|     response     = subscription.subscribe | ||||
|  | ||||
|     unless response.successful? | ||||
|     if response_failed_permanently?(response) | ||||
|       # An error in the 4xx range (except for 429, which is rate limiting) | ||||
|       # means we're not allowed to subscribe. Fail and move on | ||||
|       account.secret = '' | ||||
|       Rails.logger.debug "PuSH subscription request for #{account.acct} failed: #{response.message}" | ||||
|       account.save! | ||||
|     elsif response_successful?(response) | ||||
|       # Anything in the 2xx range means the subscription will be confirmed | ||||
|       # asynchronously, we've done what we needed to do | ||||
|       account.save! | ||||
|     else | ||||
|       # What's left is the 5xx range and 429, which means we need to retry | ||||
|       # at a later time. Fail loudly! | ||||
|       raise "Subscription attempt failed for #{account.acct} (#{account.hub_url}): HTTP #{response.code}" | ||||
|     end | ||||
|   end | ||||
|  | ||||
|     account.save! | ||||
|   rescue HTTP::Error, OpenSSL::SSL::SSLError | ||||
|     Rails.logger.debug "PuSH subscription request for #{account.acct} could not be made due to HTTP or SSL error" | ||||
|   private | ||||
|  | ||||
|   def response_failed_permanently?(response) | ||||
|     response.code > 299 && response.code < 500 && response.code != 429 | ||||
|   end | ||||
|  | ||||
|   def response_successful?(response) | ||||
|     response.code > 199 && response.code < 300 | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -28,7 +28,7 @@ class UpdateRemoteProfileService < BaseService | ||||
|  | ||||
|     account.save_with_optional_avatar! | ||||
|  | ||||
|     SubscribeService.new.call(account) if resubscribe && (account.hub_url != old_hub_url) | ||||
|     Pubsubhubbub::SubscribeWorker.perform_async(account.id) if resubscribe && (account.hub_url != old_hub_url) | ||||
|   end | ||||
|  | ||||
|   private | ||||
|   | ||||
							
								
								
									
										26
									
								
								app/services/verify_salmon_service.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								app/services/verify_salmon_service.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,26 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class VerifySalmonService < BaseService | ||||
|   include AuthorExtractor | ||||
|  | ||||
|   def call(payload) | ||||
|     body = salmon.unpack(payload) | ||||
|  | ||||
|     xml = Nokogiri::XML(body) | ||||
|     xml.encoding = 'utf-8' | ||||
|  | ||||
|     account = author_from_xml(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS)) | ||||
|  | ||||
|     if account.nil? | ||||
|       false | ||||
|     else | ||||
|       salmon.verify(payload, account.keypair) | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def salmon | ||||
|     @salmon ||= OStatus2::Salmon.new | ||||
|   end | ||||
| end | ||||
| @@ -1,7 +0,0 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class ApplicationWorker | ||||
|   def info(message) | ||||
|     Rails.logger.info("#{self.class.name} - #{message}") | ||||
|   end | ||||
| end | ||||
| @@ -1,11 +1,11 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class DistributionWorker < ApplicationWorker | ||||
| class DistributionWorker | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   def perform(status_id) | ||||
|     FanOutOnWriteService.new.call(Status.find(status_id)) | ||||
|   rescue ActiveRecord::RecordNotFound | ||||
|     info("Couldn't find the status") | ||||
|     true | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -25,7 +25,7 @@ class Pubsubhubbub::ConfirmationWorker | ||||
|  | ||||
|     body = response.body.to_s | ||||
|  | ||||
|     Rails.logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{body}" | ||||
|     logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{body}" | ||||
|  | ||||
|     if mode == 'subscribe' && body == challenge | ||||
|       subscription.save! | ||||
|   | ||||
| @@ -25,8 +25,8 @@ class Pubsubhubbub::DeliveryWorker | ||||
|                    .headers(headers) | ||||
|                    .post(subscription.callback_url, body: payload) | ||||
|  | ||||
|     return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling) | ||||
|     raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300 | ||||
|     return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling) | ||||
|     raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response) | ||||
|  | ||||
|     subscription.touch(:last_successful_delivery_at) | ||||
|   end | ||||
| @@ -37,4 +37,12 @@ class Pubsubhubbub::DeliveryWorker | ||||
|     hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload) | ||||
|     "sha1=#{hmac}" | ||||
|   end | ||||
|  | ||||
|   def response_failed_permanently?(response) | ||||
|     response.code > 299 && response.code < 500 && response.code != 429 | ||||
|   end | ||||
|  | ||||
|   def response_successful?(response) | ||||
|     response.code > 199 && response.code < 300 | ||||
|   end | ||||
| end | ||||
|   | ||||
							
								
								
									
										13
									
								
								app/workers/pubsubhubbub/subscribe_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								app/workers/pubsubhubbub/subscribe_worker.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,13 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class Pubsubhubbub::SubscribeWorker | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   sidekiq_options queue: 'push' | ||||
|  | ||||
|   def perform(account_id) | ||||
|     account = Account.find(account_id) | ||||
|     logger.debug "PuSH re-subscribing to #{account.acct}" | ||||
|     ::SubscribeService.new.call(account) | ||||
|   end | ||||
| end | ||||
							
								
								
									
										20
									
								
								app/workers/scheduler/subscriptions_scheduler.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								app/workers/scheduler/subscriptions_scheduler.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | ||||
| # frozen_string_literal: true | ||||
| require 'sidekiq-scheduler' | ||||
|  | ||||
| class Scheduler::SubscriptionsScheduler | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   def perform | ||||
|     logger.info 'Queueing PuSH re-subscriptions' | ||||
|  | ||||
|     expiring_accounts.pluck(:id).each do |id| | ||||
|       Pubsubhubbub::SubscribeWorker.perform_async(id) | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def expiring_accounts | ||||
|     Account.expiring(1.day.from_now).partitioned | ||||
|   end | ||||
| end | ||||
| @@ -79,7 +79,4 @@ Rails.application.configure do | ||||
|   config.react.variant = :development | ||||
| end | ||||
|  | ||||
| require 'sidekiq/testing' | ||||
| Sidekiq::Testing.inline! | ||||
|  | ||||
| ActiveRecordQueryTrace.enabled = ENV.fetch('QUERY_TRACE_ENABLED') { false } | ||||
|   | ||||
| @@ -109,6 +109,7 @@ Rails.application.configure do | ||||
|  | ||||
|   config.to_prepare do | ||||
|     StatsD.backend = StatsD::Instrument::Backends::NullBackend.new if ENV['STATSD_ADDR'].blank? | ||||
|     Sidekiq::Logging.logger.level = Logger::WARN | ||||
|   end | ||||
|  | ||||
|   config.action_dispatch.default_headers = { | ||||
|   | ||||
| @@ -1,2 +1,11 @@ | ||||
| --- | ||||
| :concurrency: 5 | ||||
| :queues: | ||||
|   - default | ||||
|   - push | ||||
|   - pull | ||||
|   - mailers | ||||
| :schedule: | ||||
|   subscriptions_scheduler: | ||||
|     cron: '0 5 * * *' | ||||
|     class: Scheduler::SubscriptionsScheduler | ||||
|   | ||||
| @@ -13,7 +13,7 @@ module Mastodon | ||||
|     end | ||||
|  | ||||
|     def patch | ||||
|       2 | ||||
|       3 | ||||
|     end | ||||
|  | ||||
|     def pre | ||||
|   | ||||
| @@ -78,10 +78,8 @@ namespace :mastodon do | ||||
|  | ||||
|     desc 'Re-subscribes to soon expiring PuSH subscriptions' | ||||
|     task refresh: :environment do | ||||
|       Account.expiring(1.day.from_now).find_each do |a| | ||||
|         Rails.logger.debug "PuSH re-subscribing to #{a.acct}" | ||||
|         SubscribeService.new.call(a) | ||||
|       end | ||||
|       # No-op | ||||
|       # This task is now executed via sidekiq-scheduler | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   | ||||
| @@ -45,8 +45,8 @@ RSpec.describe Api::SubscriptionsController, type: :controller do | ||||
|       post :update, params: { id: account.id } | ||||
|     end | ||||
|  | ||||
|     it 'returns http created' do | ||||
|       expect(response).to have_http_status(:created) | ||||
|     it 'returns http success' do | ||||
|       expect(response).to have_http_status(:success) | ||||
|     end | ||||
|  | ||||
|     it 'creates statuses for feed' do | ||||
|   | ||||
| @@ -53,10 +53,11 @@ RSpec.describe FollowService do | ||||
|     end | ||||
|  | ||||
|     describe 'unlocked account' do | ||||
|       let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com')).account } | ||||
|       let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com', hub_url: 'http://hub.example.com')).account } | ||||
|  | ||||
|       before do | ||||
|         stub_request(:post, "http://salmon.example.com/").to_return(:status => 200, :body => "", :headers => {}) | ||||
|         stub_request(:post, "http://hub.example.com/").to_return(status: 202) | ||||
|         subject.call(sender, bob.acct) | ||||
|       end | ||||
|  | ||||
| @@ -70,6 +71,10 @@ RSpec.describe FollowService do | ||||
|           xml.match(TagManager::VERBS[:follow]) | ||||
|         }).to have_been_made.once | ||||
|       end | ||||
|  | ||||
|       it 'subscribes to PuSH' do | ||||
|         expect(a_request(:post, "http://hub.example.com/")).to have_been_made.once | ||||
|       end | ||||
|     end | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -154,4 +154,48 @@ XML | ||||
|     expect(created_statuses.first.reblog.account_id).to eq good_actor.id | ||||
|     expect(created_statuses.first.reblog.text).to eq 'Overwatch rocks' | ||||
|   end | ||||
|  | ||||
|   it 'ignores statuses with an out-of-order delete' do | ||||
|     sender = Fabricate(:account, username: 'tracer', domain: 'overwatch.com') | ||||
|  | ||||
|     delete_body = <<XML | ||||
| <?xml version="1.0"?> | ||||
| <entry xmlns="http://www.w3.org/2005/Atom" xmlns:thr="http://purl.org/syndication/thread/1.0" xmlns:activity="http://activitystrea.ms/spec/1.0/" xmlns:poco="http://portablecontacts.net/spec/1.0" xmlns:media="http://purl.org/syndication/atommedia" xmlns:ostatus="http://ostatus.org/schema/1.0" xmlns:mastodon="http://mastodon.social/schema/1.0"> | ||||
|   <id>tag:overwatch.com,2017-04-27:objectId=4487555:objectType=Status</id> | ||||
|   <published>2017-04-27T13:49:25Z</published> | ||||
|   <updated>2017-04-27T13:49:25Z</updated> | ||||
|   <activity:object-type>http://activitystrea.ms/schema/1.0/note</activity:object-type> | ||||
|   <activity:verb>http://activitystrea.ms/schema/1.0/delete</activity:verb> | ||||
|   <author> | ||||
|     <id>https://overwatch.com/users/tracer</id> | ||||
|     <activity:object-type>http://activitystrea.ms/schema/1.0/person</activity:object-type> | ||||
|     <uri>https://overwatch.com/users/tracer</uri> | ||||
|     <name>tracer</name> | ||||
|   </author> | ||||
| </entry> | ||||
| XML | ||||
|  | ||||
|     status_body = <<XML | ||||
| <?xml version="1.0"?> | ||||
| <entry xmlns="http://www.w3.org/2005/Atom" xmlns:thr="http://purl.org/syndication/thread/1.0" xmlns:activity="http://activitystrea.ms/spec/1.0/" xmlns:poco="http://portablecontacts.net/spec/1.0" xmlns:media="http://purl.org/syndication/atommedia" xmlns:ostatus="http://ostatus.org/schema/1.0" xmlns:mastodon="http://mastodon.social/schema/1.0"> | ||||
|   <id>tag:overwatch.com,2017-04-27:objectId=4487555:objectType=Status</id> | ||||
|   <published>2017-04-27T13:49:25Z</published> | ||||
|   <updated>2017-04-27T13:49:25Z</updated> | ||||
|   <activity:object-type>http://activitystrea.ms/schema/1.0/note</activity:object-type> | ||||
|   <activity:verb>http://activitystrea.ms/schema/1.0/post</activity:verb> | ||||
|   <author> | ||||
|     <id>https://overwatch.com/users/tracer</id> | ||||
|     <activity:object-type>http://activitystrea.ms/schema/1.0/person</activity:object-type> | ||||
|     <uri>https://overwatch.com/users/tracer</uri> | ||||
|     <name>tracer</name> | ||||
|   </author> | ||||
|   <content type="html">Overwatch rocks</content> | ||||
| </entry> | ||||
| XML | ||||
|  | ||||
|     subject.call(delete_body, sender) | ||||
|     created_statuses = subject.call(status_body, sender) | ||||
|  | ||||
|     expect(created_statuses).to be_empty | ||||
|   end | ||||
| end | ||||
|   | ||||
							
								
								
									
										38
									
								
								spec/services/subscribe_service_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								spec/services/subscribe_service_spec.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,38 @@ | ||||
| require 'rails_helper' | ||||
|  | ||||
| RSpec.describe SubscribeService do | ||||
|   let(:account) { Fabricate(:account, username: 'bob', domain: 'example.com', hub_url: 'http://hub.example.com') } | ||||
|   subject { SubscribeService.new } | ||||
|  | ||||
|   it 'sends subscription request to PuSH hub' do | ||||
|     stub_request(:post, 'http://hub.example.com/').to_return(status: 202) | ||||
|     subject.call(account) | ||||
|     expect(a_request(:post, 'http://hub.example.com/')).to have_been_made.once | ||||
|   end | ||||
|  | ||||
|   it 'generates and keeps PuSH secret on successful call' do | ||||
|     stub_request(:post, 'http://hub.example.com/').to_return(status: 202) | ||||
|     subject.call(account) | ||||
|     expect(account.secret).to_not be_blank | ||||
|   end | ||||
|  | ||||
|   it 'fails silently if PuSH hub forbids subscription' do | ||||
|     stub_request(:post, 'http://hub.example.com/').to_return(status: 403) | ||||
|     subject.call(account) | ||||
|   end | ||||
|  | ||||
|   it 'fails silently if PuSH hub is not found' do | ||||
|     stub_request(:post, 'http://hub.example.com/').to_return(status: 404) | ||||
|     subject.call(account) | ||||
|   end | ||||
|  | ||||
|   it 'fails loudly if there is a network error' do | ||||
|     stub_request(:post, 'http://hub.example.com/').to_raise(HTTP::Error) | ||||
|     expect { subject.call(account) }.to raise_error HTTP::Error | ||||
|   end | ||||
|  | ||||
|   it 'fails loudly if PuSH hub is unavailable' do | ||||
|     stub_request(:post, 'http://hub.example.com/').to_return(status: 503) | ||||
|     expect { subject.call(account) }.to raise_error | ||||
|   end | ||||
| end | ||||
							
								
								
									
										19
									
								
								spec/workers/scheduler/subscriptions_scheduler_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								spec/workers/scheduler/subscriptions_scheduler_spec.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,19 @@ | ||||
| require 'rails_helper' | ||||
|  | ||||
| describe Scheduler::SubscriptionsScheduler do | ||||
|   subject { Scheduler::SubscriptionsScheduler.new } | ||||
|  | ||||
|   let!(:expiring_account1) { Fabricate(:account, subscription_expires_at: 20.minutes.from_now, domain: 'example.com', followers_count: 1, hub_url: 'http://hub.example.com') } | ||||
|   let!(:expiring_account2) { Fabricate(:account, subscription_expires_at: 4.hours.from_now, domain: 'example.org', followers_count: 1, hub_url: 'http://hub.example.org') } | ||||
|  | ||||
|   before do | ||||
|     stub_request(:post, 'http://hub.example.com/').to_return(status: 202) | ||||
|     stub_request(:post, 'http://hub.example.org/').to_return(status: 202) | ||||
|   end | ||||
|  | ||||
|   it 're-subscribes for all expiring accounts' do | ||||
|     subject.perform | ||||
|     expect(a_request(:post, 'http://hub.example.com/')).to have_been_made.once | ||||
|     expect(a_request(:post, 'http://hub.example.org/')).to have_been_made.once | ||||
|   end | ||||
| end | ||||
		Reference in New Issue
	
	Block a user