Merge pull request #518 from warpc/510-resque_migrate

[refs #510] Replace dj to resque
This commit is contained in:
Vladimir Sharshov 2012-06-09 08:34:41 -07:00
commit aa3b631f20
19 changed files with 74 additions and 97 deletions

View File

@ -14,7 +14,7 @@ gem 'cancan', '~> 1.6.7'
gem 'ancestry', '~> 1.2.5' gem 'ancestry', '~> 1.2.5'
gem 'paperclip', '~> 3.0.2' gem 'paperclip', '~> 3.0.2'
gem 'delayed_job_active_record', '~> 0.3.2' #gem 'delayed_job_active_record', '~> 0.3.2'
gem 'resque' gem 'resque'
gem 'russian', '~> 0.6.0' gem 'russian', '~> 0.6.0'
gem 'highline', '~> 1.6.11' gem 'highline', '~> 1.6.11'

View File

@ -96,11 +96,6 @@ GEM
compass (~> 0.12.0) compass (~> 0.12.0)
creole (0.4.2) creole (0.4.2)
daemons (1.1.6) daemons (1.1.6)
delayed_job (3.0.2)
activesupport (~> 3.0)
delayed_job_active_record (0.3.2)
activerecord (> 2.1.0)
delayed_job (~> 3.0.0)
devise (2.0.4) devise (2.0.4)
bcrypt-ruby (~> 3.0) bcrypt-ruby (~> 3.0)
orm_adapter (~> 0.0.3) orm_adapter (~> 0.0.3)
@ -340,7 +335,6 @@ DEPENDENCIES
compass-rails (~> 1.0.1) compass-rails (~> 1.0.1)
creole creole
daemons (= 1.1.6) daemons (= 1.1.6)
delayed_job_active_record (~> 0.3.2)
devise (~> 2.0.4) devise (~> 2.0.4)
diff-display (~> 0.0.1) diff-display (~> 0.0.1)
factory_girl_rails (~> 3.1.0) factory_girl_rails (~> 3.1.0)

View File

@ -98,7 +98,7 @@ class Platforms::PlatformsController < Platforms::BaseController
end end
def destroy def destroy
@platform.delay.destroy if @platform @platform.async(:destroy) if @platform
flash[:notice] = t("flash.platform.destroyed") flash[:notice] = t("flash.platform.destroyed")
redirect_to platforms_path redirect_to platforms_path

View File

@ -132,7 +132,7 @@ class Projects::BuildListsController < Projects::BaseController
render :nothing => true, :status => 200 render :nothing => true, :status => 200
@build_list.delay.publish if @build_list.auto_publish # && @build_list.can_publish? @build_list.async(:publish) if @build_list.auto_publish # && @build_list.can_publish?
end end
def circle_build def circle_build

View File

@ -258,7 +258,7 @@ class Projects::WikiController < Projects::BaseController
# @committer.after_commit do |committer, sha1| # @committer.after_commit do |committer, sha1|
# here goes callback for notification # here goes callback for notification
# end # end
ActivityFeedObserver.instance.after_create(@committer).delay ActivityFeedObserver.instance.async(:after_create, @committer)
end end
@committer @committer
end end

View File

@ -3,6 +3,10 @@
class UserMailer < ActionMailer::Base class UserMailer < ActionMailer::Base
default :from => APP_CONFIG['do-not-reply-email'] default :from => APP_CONFIG['do-not-reply-email']
include Modules::Models::ResqueAsyncMethods
@queue = :notifications
def new_user_notification(user) def new_user_notification(user)
@user = user @user = user
mail(:to => user.email, :subject => I18n.t("notifications.subjects.new_user_notification", :project_name => APP_CONFIG['project_name'])) do |format| mail(:to => user.email, :subject => I18n.t("notifications.subjects.new_user_notification", :project_name => APP_CONFIG['project_name'])) do |format|

View File

@ -2,6 +2,10 @@
class ActivityFeedObserver < ActiveRecord::Observer class ActivityFeedObserver < ActiveRecord::Observer
observe :issue, :comment, :user, :build_list observe :issue, :comment, :user, :build_list
include Modules::Models::ResqueAsyncMethods
@queue = :notifications
def after_create(record) def after_create(record)
case record.class.to_s case record.class.to_s
when 'User' when 'User'
@ -15,7 +19,7 @@ class ActivityFeedObserver < ActiveRecord::Observer
recipients = record.collect_recipient_ids recipients = record.collect_recipient_ids
recipients.each do |recipient_id| recipients.each do |recipient_id|
recipient = User.find(recipient_id) recipient = User.find(recipient_id)
UserMailer.delay.new_issue_notification(record, recipient) if User.find(recipient).notifier.can_notify && User.find(recipient).notifier.new_issue UserMailer.async(:new_issue_notification, record, recipient) if User.find(recipient).notifier.can_notify && User.find(recipient).notifier.new_issue
ActivityFeed.create( ActivityFeed.create(
:user => recipient, :user => recipient,
:kind => 'new_issue_notification', :kind => 'new_issue_notification',
@ -25,7 +29,7 @@ class ActivityFeedObserver < ActiveRecord::Observer
end end
if record.assignee_id_changed? if record.assignee_id_changed?
UserMailer.delay.issue_assign_notification(record, record.assignee) if record.assignee.notifier.issue_assign && record.assignee.notifier.can_notify UserMailer.async(:new_issue_notification, record, record.assignee) if record.assignee.notifier.issue_assign && record.assignee.notifier.can_notify
ActivityFeed.create( ActivityFeed.create(
:user => record.user, :user => record.user,
:kind => 'issue_assign_notification', :kind => 'issue_assign_notification',
@ -39,7 +43,7 @@ class ActivityFeedObserver < ActiveRecord::Observer
subscribes = record.commentable.subscribes subscribes = record.commentable.subscribes
subscribes.each do |subscribe| subscribes.each do |subscribe|
if record.user_id != subscribe.user_id if record.user_id != subscribe.user_id
UserMailer.delay.new_comment_notification(record, subscribe.user) if record.can_notify_on_new_comment?(subscribe) UserMailer.async(:new_comment_notification, record, subscribe.user) if record.can_notify_on_new_comment?(subscribe)
ActivityFeed.create( ActivityFeed.create(
:user => subscribe.user, :user => subscribe.user,
:kind => 'new_comment_notification', :kind => 'new_comment_notification',
@ -57,7 +61,7 @@ class ActivityFeedObserver < ActiveRecord::Observer
( (subscribe.project.owner?(subscribe.user) && subscribe.user.notifier.new_comment_commit_repo_owner) or ( (subscribe.project.owner?(subscribe.user) && subscribe.user.notifier.new_comment_commit_repo_owner) or
(subscribe.user.commentor?(record.commentable) && subscribe.user.notifier.new_comment_commit_commentor) or (subscribe.user.commentor?(record.commentable) && subscribe.user.notifier.new_comment_commit_commentor) or
(subscribe.user.committer?(record.commentable) && subscribe.user.notifier.new_comment_commit_owner) ) (subscribe.user.committer?(record.commentable) && subscribe.user.notifier.new_comment_commit_owner) )
UserMailer.delay.new_comment_notification(record, subscribe.user) UserMailer.async(:new_comment_notification, record, subscribe.user)
end end
ActivityFeed.create( ActivityFeed.create(
:user => subscribe.user, :user => subscribe.user,
@ -122,7 +126,7 @@ class ActivityFeedObserver < ActiveRecord::Observer
case record.class.to_s case record.class.to_s
when 'Issue' when 'Issue'
if record.assignee_id && record.assignee_id_changed? if record.assignee_id && record.assignee_id_changed?
UserMailer.delay.issue_assign_notification(record, record.assignee) if record.assignee.notifier.issue_assign && record.assignee.notifier.can_notify UserMailer.async(:issue_assign_notification, record, record.assignee) if record.assignee.notifier.issue_assign && record.assignee.notifier.can_notify
ActivityFeed.create( ActivityFeed.create(
:user => record.assignee, :user => record.assignee,
:kind => 'issue_assign_notification', :kind => 'issue_assign_notification',

View File

@ -100,6 +100,10 @@ class BuildList < ActiveRecord::Base
after_create :place_build after_create :place_build
after_destroy :delete_container after_destroy :delete_container
include Modules::Models::ResqueAsyncMethods
@queue = :clone_and_build
def self.human_status(status) def self.human_status(status)
I18n.t("layout.build_lists.statuses.#{HUMAN_STATUSES[status]}") I18n.t("layout.build_lists.statuses.#{HUMAN_STATUSES[status]}")
end end

View File

@ -12,6 +12,10 @@ class MassBuild < ActiveRecord::Base
after_create :build_all after_create :build_all
include Modules::Models::ResqueAsyncMethods
@queue = :clone_and_build
def initialize(args = nil) def initialize(args = nil)
super super
@ -24,7 +28,7 @@ class MassBuild < ActiveRecord::Base
# ATTENTION: repositories and arches must be set before calling this method! # ATTENTION: repositories and arches must be set before calling this method!
def build_all def build_all
platform.delay.build_all( platform.async(:build_all,
:mass_build_id => self.id, :mass_build_id => self.id,
:user => self.user, :user => self.user,
:repositories => self.repositories, :repositories => self.repositories,

View File

@ -47,6 +47,9 @@ class Platform < ActiveRecord::Base
attr_readonly :name, :distrib_type, :parent_platform_id, :platform_type attr_readonly :name, :distrib_type, :parent_platform_id, :platform_type
include Modules::Models::Owner include Modules::Models::Owner
include Modules::Models::ResqueAsyncMethods
@queue = :clone_and_build
def urpmi_list(host, pair = nil) def urpmi_list(host, pair = nil)
blank_pair = {:login => 'login', :pass => 'password'} blank_pair = {:login => 'login', :pass => 'password'}
@ -118,7 +121,7 @@ class Platform < ActiveRecord::Base
def full_clone(attrs = {}) def full_clone(attrs = {})
base_clone(attrs).tap do |c| base_clone(attrs).tap do |c|
with_skip {c.save} and c.clone_relations(self) and c.delay.xml_rpc_clone with_skip {c.save} and c.clone_relations(self) and c.async(:xml_rpc_clone)
end end
end end
@ -172,7 +175,7 @@ class Platform < ActiveRecord::Base
begin begin
p.build_for(self, user, arch, auto_publish, mass_build_id) p.build_for(self, user, arch, auto_publish, mass_build_id)
rescue RuntimeError, Exception rescue RuntimeError, Exception
p.delay.build_for(self, user, arch, auto_publish, mass_build_id) p.async(:build_for, self, user, arch, auto_publish, mass_build_id)
end end
end end
end end

View File

@ -54,20 +54,9 @@ class Project < ActiveRecord::Base
has_attached_file :srpm has_attached_file :srpm
include Modules::Models::Owner include Modules::Models::Owner
include Modules::Models::ResqueAsyncMethods
@queue = :fork_and_import @queue = :fork_import_hook
# This will be called by a worker when a job needs to be processed
def self.perform(id, method, *args)
find(id).send(method, *args)
end
# We can pass this any Repository instance method that we want to
# run later.
def async(method, *args)
Resque.enqueue(Project, id, method, *args)
end
def to_param def to_param
name name
@ -305,7 +294,7 @@ class Project < ActiveRecord::Base
hook = File.join(::Rails.root.to_s, 'tmp', "post-receive-hook") hook = File.join(::Rails.root.to_s, 'tmp', "post-receive-hook")
FileUtils.cp(File.join(::Rails.root.to_s, 'bin', "post-receive-hook.partial"), hook) FileUtils.cp(File.join(::Rails.root.to_s, 'bin', "post-receive-hook.partial"), hook)
File.open(hook, 'a') do |f| File.open(hook, 'a') do |f|
s = "\n /bin/bash -l -c \"cd #{is_production ? '/srv/rosa_build/current' : Rails.root.to_s} && #{is_production ? 'RAILS_ENV=production' : ''} bundle exec rails runner 'Project.delay(:queue => \\\"hook\\\").process_hook(\\\"$owner\\\", \\\"$reponame\\\", \\\"$newrev\\\", \\\"$oldrev\\\", \\\"$ref\\\", \\\"$newrev_type\\\", \\\"$oldrev_type\\\")'\"" s = "\n /bin/bash -l -c \"cd #{is_production ? '/srv/rosa_build/current' : Rails.root.to_s} && #{is_production ? 'RAILS_ENV=production' : ''} bundle exec rails runner 'Project.async(:process_hook, \\\"$owner\\\", \\\"$reponame\\\", \\\"$newrev\\\", \\\"$oldrev\\\", \\\"$ref\\\", \\\"$newrev_type\\\", \\\"$oldrev_type\\\")'\""
s << " > /dev/null 2>&1" if is_production s << " > /dev/null 2>&1" if is_production
s << "\ndone\n" s << "\ndone\n"
f.write(s) f.write(s)

View File

@ -16,6 +16,10 @@ class Repository < ActiveRecord::Base
attr_accessible :name, :description attr_accessible :name, :description
attr_readonly :name, :platform_id attr_readonly :name, :platform_id
include Modules::Models::ResqueAsyncMethods
@queue = :clone_and_build
def base_clone(attrs = {}) def base_clone(attrs = {})
dup.tap do |c| dup.tap do |c|
c.platform_id = nil c.platform_id = nil
@ -32,7 +36,7 @@ class Repository < ActiveRecord::Base
def full_clone(attrs = {}) def full_clone(attrs = {})
base_clone(attrs).tap do |c| base_clone(attrs).tap do |c|
with_skip {c.save} and c.delay.clone_relations(self) with_skip {c.save} and c.async(:clone_relations, self)
end end
end end

View File

@ -36,9 +36,8 @@ set :deploy_via, :remote_cache
require './lib/recipes/nginx' require './lib/recipes/nginx'
require './lib/recipes/unicorn' require './lib/recipes/unicorn'
#require './lib/recipes/bluepill' #require './lib/recipes/bluepill'
require './lib/recipes/delayed_job'
set :workers_count, 2 set :workers_count, 4
require './lib/recipes/resque' require './lib/recipes/resque'
@ -92,11 +91,6 @@ after "deploy:setup", "deploy:symlink_pids"
#after "deploy:start", "bluepill:start" #after "deploy:start", "bluepill:start"
#after "deploy:stop", "bluepill:stop" #after "deploy:stop", "bluepill:stop"
# DJ
after "deploy:stop", "delayed_job:stop"
after "deploy:start", "delayed_job:start"
after "deploy:restart", "delayed_job:restart"
# Resque # Resque
after "deploy:stop", "resque:stop" after "deploy:stop", "resque:stop"
after "deploy:start", "resque:start" after "deploy:start", "resque:start"

View File

@ -4,17 +4,6 @@ app_name = ENV['APP_NAME'] || 'rosa_build'
Bluepill.application(app_name, :log_file => "/srv/rosa_build/shared/log/bluepill.log") do |app| Bluepill.application(app_name, :log_file => "/srv/rosa_build/shared/log/bluepill.log") do |app|
app.uid = app.gid = 'rosa' app.uid = app.gid = 'rosa'
app.working_dir = "/srv/#{app_name}/current" app.working_dir = "/srv/#{app_name}/current"
#%w(hook default).each do |queue|
# app.process("delayed_job_#{queue}_queue") do |process|
# process.start_grace_time = 10.seconds
# process.stop_grace_time = 10.seconds
# process.restart_grace_time = 10.seconds
# process.start_command = "/usr/bin/env ruby script/delayed_job --queue=#{queue} -p #{queue} --pid-dir=/srv/#{app_name}/current/tmp/#{queue}_pids start"
# process.stop_command = "/usr/bin/env ruby script/delayed_job --pid-dir=/srv/#{app_name}/current/tmp/#{queue}_pids stop"
# process.pid_file = File.join(app.working_dir, 'tmp', "#{queue}_pids", 'delayed_job.pid')
# end
#end
app.process("resque") do |process| app.process("resque") do |process|
process.group = "resque" process.group = "resque"

View File

@ -0,0 +1,32 @@
# -*- encoding : utf-8 -*-
module Modules
module Models
module ResqueAsyncMethods
extend ActiveSupport::Concern
included do
# We can pass this any Repository instance method that we want to
# run later.
def async(method, *args)
Resque.enqueue(self.class, id, method, *args)
end
end
module ClassMethods
# This will be called by a worker when a job needs to be processed
def perform(id, method, *args)
unless id.nil?
find(id).send(method, *args)
else
send(method, *args)
end
end
def async(method, *args)
Resque.enqueue(self, nil, method, *args)
end
end
end
end
end

View File

@ -23,13 +23,6 @@ Capistrano::Configuration.instance(:must_exist).load do
task :status, :roles => [:app] do task :status, :roles => [:app] do
run "cd #{fetch :current_path} && #{try_sudo} #{bluepill_binary} #{fetch :application} status" run "cd #{fetch :current_path} && #{try_sudo} #{bluepill_binary} #{fetch :application} status"
end end
#desc "Restart DJ processes"
#task :restart_dj, :roles => [:app] do
# %w(fork import hook default).each do |queue|
# run "cd #{fetch :current_path} && #{try_sudo} #{bluepill_binary} #{fetch :application} restart delayed_job_#{queue}_queue"
# end
#end
end end
desc "Start a bluepill process and load a config" desc "Start a bluepill process and load a config"

View File

@ -1,37 +0,0 @@
# -*- encoding : utf-8 -*-
Capistrano::Configuration.instance(:must_exist).load do
namespace :delayed_job do
def dj_queues
%w(hook default)
end
def rails_env
fetch(:rails_env, false) ? "RAILS_ENV=#{fetch(:rails_env)}" : ''
end
def roles
fetch(:delayed_job_server_role, :app)
end
desc "Stop the delayed_job process"
task :stop, :roles => lambda { roles } do
dj_queues.each do |queue|
run "cd #{current_path};#{rails_env} script/delayed_job --pid-dir=#{shared_path}/pids/#{queue} stop"
end
end
desc "Start the delayed_job process"
task :start, :roles => lambda { roles } do
dj_queues.each do |queue|
run "cd #{current_path};#{rails_env} script/delayed_job --queue=#{queue} -p #{queue} --pid-dir=#{shared_path}/pids/#{queue} start"
end
end
desc "Restart the delayed_job process"
task :restart, :roles => lambda { roles } do
dj_queues.each do |queue|
run "cd #{current_path};#{rails_env} script/delayed_job --queue=#{queue} -p #{queue} --pid-dir=#{shared_path}/pids/#{queue} restart"
end
end
end
end

View File

@ -24,7 +24,7 @@ Capistrano::Configuration.instance(:must_exist).load do
end end
def start_workers def start_workers
run "cd #{fetch :current_path} && COUNT=#{ workers_count } QUEUE=fork_and_import #{ rails_env } BACKGROUND=yes bundle exec rake resque:workers" run "cd #{fetch :current_path} && COUNT=#{ workers_count } QUEUE=fork_import_hook,clone_and_build,notifications #{ rails_env } BACKGROUND=yes bundle exec rake resque:workers"
end end
end end
end end

View File

@ -6,7 +6,7 @@ namespace :hook do
hook = File.join(::Rails.root.to_s, 'tmp', "post-receive-hook") hook = File.join(::Rails.root.to_s, 'tmp', "post-receive-hook")
FileUtils.cp(File.join(::Rails.root.to_s, 'bin', "post-receive-hook.partial"), hook) FileUtils.cp(File.join(::Rails.root.to_s, 'bin', "post-receive-hook.partial"), hook)
File.open(hook, 'a') do |f| File.open(hook, 'a') do |f|
s = "\n /bin/bash -l -c \"cd #{is_production ? '/srv/rosa_build/current' : Rails.root.to_s} && #{is_production ? 'RAILS_ENV=production' : ''} bundle exec rails runner 'Project.delay(:queue => \\\"hook\\\").process_hook(\\\"$owner\\\", \\\"$reponame\\\", \\\"$newrev\\\", \\\"$oldrev\\\", \\\"$ref\\\", \\\"$newrev_type\\\", \\\"$oldrev_type\\\")'\"" s = "\n /bin/bash -l -c \"cd #{is_production ? '/srv/rosa_build/current' : Rails.root.to_s} && #{is_production ? 'RAILS_ENV=production' : ''} bundle exec rails runner 'Project.async(:process_hook, \\\"$owner\\\", \\\"$reponame\\\", \\\"$newrev\\\", \\\"$oldrev\\\", \\\"$ref\\\", \\\"$newrev_type\\\", \\\"$oldrev_type\\\")'\""
s << " > /dev/null 2>&1" if is_production s << " > /dev/null 2>&1" if is_production
s << "\ndone\n" s << "\ndone\n"
f.write(s) f.write(s)