Merge pull request #362 from abf/rosa-build:parallel-build-lists

own queues for each user
This commit is contained in:
avokhmin 2014-02-27 23:46:27 +04:00
commit 2d4f59d33e
20 changed files with 213 additions and 73 deletions

View File

@ -84,7 +84,7 @@ group :assets do
gem 'coffee-rails', '~> 3.2.2'
gem 'compass-rails', '~> 1.0.3'
gem 'uglifier', '~> 1.2.4'
gem 'therubyracer', '~> 0.10.2', platforms: [:mri, :rbx]
gem 'therubyracer', '~> 0.12.1', platforms: [:mri, :rbx]
gem 'therubyrhino', '~> 1.73.1', platforms: :jruby
gem 'turbo-sprockets-rails3'
end

View File

@ -192,7 +192,7 @@ GEM
json (1.8.1)
jwt (0.1.10)
multi_json (>= 1.5)
libv8 (3.3.10.4)
libv8 (3.16.14.3)
localeapp (0.6.14)
gli
i18n
@ -320,7 +320,7 @@ GEM
rdoc (3.12.2)
json (~> 1.4)
redcarpet (2.2.2)
redis (3.0.6)
redis (3.0.7)
redis-actionpack (3.2.4)
actionpack (~> 3.2.0)
redis-rack (~> 1.4.4)
@ -342,13 +342,14 @@ GEM
redisk (0.2.2)
redis (>= 0.1.1)
redis-namespace (>= 0.1.0)
ref (1.0.5)
resque (1.25.1)
mono_logger (~> 1.0)
multi_json (~> 1.0)
redis-namespace (~> 1.2)
sinatra (>= 0.9.2)
vegas (~> 0.1.2)
resque-scheduler (2.5.3)
resque-scheduler (2.5.4)
redis (~> 3.0.4)
resque (~> 1.25.1)
rufus-scheduler (~> 2.0.24)
@ -420,8 +421,9 @@ GEM
state_machine (1.2.0)
stringex (1.4.0)
systemu (2.5.2)
therubyracer (0.10.2)
libv8 (~> 3.3.10)
therubyracer (0.12.1)
libv8 (~> 3.16.14.0)
ref
thin (1.5.1)
daemons (>= 1.0.9)
eventmachine (>= 0.12.6)
@ -540,7 +542,7 @@ DEPENDENCIES
shoulda
soundmanager-rails
state_machine
therubyracer (~> 0.10.2)
therubyracer (~> 0.12.1)
therubyrhino (~> 1.73.1)
time_diff
trinidad (~> 1.0.2)

View File

@ -6,9 +6,4 @@ require 'rake'
require 'resque/tasks'
require 'resque_scheduler/tasks'
# This fixes connection fail with Postgres server on new fork:
task "resque:setup" => :environment do
Resque.before_fork = Proc.new { ActiveRecord::Base.establish_connection }
end
Rosa::Application.load_tasks

View File

@ -7,42 +7,39 @@ class Api::V1::JobsController < Api::V1::BaseController
before_filter :authenticate_user!
def shift
platform_ids = Platform.where(name: params[:platforms].split(',')).pluck(:id) if params[:platforms].present?
arch_ids = Arch.where(name: params[:arches].split(',')).pluck(:id) if params[:arches].present?
build_lists = BuildList.for_status(BuildList::BUILD_PENDING).scoped_to_arch(arch_ids).
oldest.order(:created_at)
build_lists = build_lists.for_platform(platform_ids) if platform_ids.present?
if current_user.system?
if task = (Resque.pop('rpm_worker_default') || Resque.pop('rpm_worker'))
@build_list = BuildList.where(id: task['args'][0]['id']).first
@build_list.delayed_add_job_to_abf_worker_queue
end
end
@build_list = BuildList.next_build if current_user.system?
unless @build_list
platform_ids = Platform.where(name: params[:platforms].split(',')).pluck(:id) if params[:platforms].present?
arch_ids = Arch.where(name: params[:arches].split(',')).pluck(:id) if params[:arches].present?
build_lists = BuildList.for_status(BuildList::BUILD_PENDING).scoped_to_arch(arch_ids).
oldest.order(:created_at)
build_lists = build_lists.for_platform(platform_ids) if platform_ids.present?
ActiveRecord::Base.transaction do
if current_user.system?
@build_list ||= build_lists.external_nodes(:everything).first
@build_list.touch if @build_list
else
@build_list = build_lists.external_nodes(:owned).for_user(current_user).first
@build_list ||= build_lists.external_nodes(:everything).
accessible_by(current_ability, :everything).readonly(false).first
ActiveRecord::Base.transaction do
if current_user.system?
@build_list ||= build_lists.external_nodes(:everything).first
@build_list.touch if @build_list
else
@build_list = build_lists.external_nodes(:owned).for_user(current_user).first
@build_list ||= build_lists.external_nodes(:everything).
accessible_by(current_ability, :everything).readonly(false).first
if @build_list
@build_list.builder = current_user
@build_list.save
if @build_list
@build_list.builder = current_user
@build_list.save
end
end
end
end unless @build_list
if @build_list
job = {
worker_queue: @build_list.worker_queue_with_priority,
worker_class: @build_list.worker_queue_class,
:worker_args => [@build_list.abf_worker_args]
}
end
job = {
worker_queue: @build_list.worker_queue_with_priority(false),
worker_class: @build_list.worker_queue_class,
:worker_args => [@build_list.abf_worker_args]
} if @build_list
render json: { job: job }.to_json
end

View File

@ -0,0 +1,8 @@
class BuildListsPublishTaskManagerJob
@queue = :hook
def self.perform
AbfWorker::BuildListsPublishTaskManager.new.run
end
end

View File

@ -0,0 +1,49 @@
class BuildListsQueuesMonitoringJob
@queue = :hook
def self.perform
redis.smembers('queues').each do |key|
next if key !~ /(user|mass)_build_/
queue = "queue:#{key}"
id = key.gsub(/[^\d]/, '')
if redis.llen(queue) == 0
if key =~ /^user/
last_updated_at = BuildList.select(:updated_at).
where(user_id: id).order('updated_at DESC').first
else
last_updated_at = MassBuild.select(:updated_at).where(id: 250).first
end
last_updated_at = last_updated_at.try(:updated_at)
# cleans queue if no activity and tasks for this queue
clean(key) if !last_updated_at || (last_updated_at + 5.minutes) < Time.zone.now
else
# ensures that user/mass-build in the set from which we select next jobs
set_key = key =~ /^user/ ? BuildList::USER_BUILDS_SET : BuildList::MASS_BUILDS_SET
redis.sadd set_key, id
end
end
end
def self.clean(key)
queue = "queue:#{key}"
# See [#watch]: https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2012
redis.watch(queue) do
if redis.llen(queue) == 0
redis.multi do |multi|
multi.del queue
multi.srem 'queues', key
end
else
redis.unwatch
end
end
end
def self.redis
@redis ||= Resque.redis
end
end

View File

@ -0,0 +1,10 @@
class CleanRpmBuildNodeJob
@queue = :hook
def self.perform
RpmBuildNode.all.each do |n|
n.delete unless n.user_id
end
end
end

View File

@ -174,7 +174,7 @@ class BuildList < ActiveRecord::Base
# build_canceling: :build_canceled - canceling from UI
# build_started: :build_canceled - canceling from worker by time-out (time_living has been expired)
event :build_canceled do
transition [:build_canceling, :build_started] => :build_canceled
transition [:build_canceling, :build_started, :build_pending] => :build_canceled
end
event :published do
@ -526,6 +526,26 @@ class BuildList < ActiveRecord::Base
)
end
def self.next_build
redis = Resque.redis
kind_id = redis.spop(USER_BUILDS_SET)
key = "user_build_#{kind_id}_rpm_worker_default" if kind_id
task = Resque.pop(key) if key
redis.sadd(USER_BUILDS_SET, kind_id) if task
kind_id ||= redis.spop(MASS_BUILDS_SET)
key ||= "mass_build_#{kind_id}_rpm_worker" if kind_id
task ||= Resque.pop(key) if key
redis.sadd(MASS_BUILDS_SET, kind_id) if task && key =~ /^mass_build/
if task
build_list = BuildList.where(id: task['args'][0]['id']).first
build_list.delayed_add_job_to_abf_worker_queue
build_list
end
end
def delayed_add_job_to_abf_worker_queue(*args)
restart_job if status == BUILD_PENDING
end

View File

@ -19,8 +19,8 @@ class BuildListObserver < ActiveRecord::Observer
rescue ActiveRecord::RecordNotUnique
retry
end
build_count = statistic.build_count
new_av_time = ( statistic.average_build_time * build_count + record.duration ) / ( build_count + 1 )
build_count = statistic.build_count.to_i
new_av_time = ( statistic.average_build_time * build_count + record.duration.to_i ) / ( build_count + 1 )
statistic.update_attributes(average_build_time: new_av_time, build_count: build_count + 1)
end
end

View File

@ -17,12 +17,6 @@ class RpmBuildNode < Ohm::Model
User.where(id: user_id).first
end
def self.cleanup!
RpmBuildNode.all.each do |n|
n.delete unless n.user_id
end
end
def self.total_statistics
systems, others, busy = 0, 0, 0
RpmBuildNode.all.select{ |n| n.user_id }.each do |n|

View File

@ -27,6 +27,7 @@ module Rosa
# Custom directories with classes and modules you want to be autoloadable.
# config.autoload_paths += %W(#{config.root}/extras)
config.autoload_paths += %W(#{config.root}/app/presenters)
config.autoload_paths += %W(#{config.root}/app/jobs)
# Only load the plugins named here, in the order given (default is alphabetical).
# :all can be used as a placeholder for all plugins not explicitly named.

View File

@ -12,4 +12,4 @@ unless Rails.env.test?
PerformLater.config.enabled = true # this will default to false if unset
end
Resque::Plugins::Status::Hash.expire_in = (24 * 60 * 60) # 24hrs in seconds
Resque::Plugins::Status::Hash.expire_in = (24 * 60 * 60) # 24hrs in seconds

View File

@ -0,0 +1,20 @@
clean_rpm_build_nodes:
every:
- '1m'
class: 'CleanRpmBuildNodeJob'
queue: hook
description: 'Cleans RPM build nodes'
build_lists_publish_task_manager:
every:
- '3m'
class: 'BuildListsPublishTaskManagerJob'
queue: hook
description: 'Creates tasks for publishing'
build_lists_queues_monitoring:
every:
- '1m'
class: 'BuildListsQueuesMonitoringJob'
queue: hook
description: 'Monitoring for "user/mass-build" queues'

View File

@ -26,14 +26,6 @@ every :day, at: '3:00 am' do
rake 'activity_feeds:clear', output: 'log/activity_feeds.log'
end
every 3.minute do
runner 'AbfWorker::BuildListsPublishTaskManager.new.run', output: 'log/task_manager.log'
end
every 1.minute do
runner 'RpmBuildNode.cleanup!'
end
every 1.hour do
rake 'buildlist:clear:outdated_canceling', output: 'log/canceling_build_list_clear.log'
end

View File

@ -299,8 +299,6 @@ module AbfWorker
platform_path << '/' << build_for_platform.name
system "mkdir -p #{platform_path}"
end
worker_queue = bl ? bl.worker_queue_with_priority("publish_worker") : 'publish_worker_default'
worker_class = bl ? bl.worker_queue_class("AbfWorker::PublishWorker") : 'AbfWorker::PublishWorkerDefault'
distrib_type = build_for_platform.distrib_type
cmd_params = {
@ -350,9 +348,9 @@ module AbfWorker
packages[:sources] = new_sources.values.compact
Resque.push(
worker_queue,
'class' => worker_class,
'args' => [options.merge({
'publish_worker_default',
'class' => 'AbfWorker::PublishWorkerDefault',
'args' => [options.merge({
packages: packages,
old_packages: old_packages,
build_list_ids: build_list_ids,

View File

@ -3,6 +3,9 @@ module AbfWorker::ModelHelper
# - #abf_worker_args
# - #build_canceled
MASS_BUILDS_SET = 'abf-worker::mass-builds'
USER_BUILDS_SET = 'abf-worker::user-builds'
def self.included(base)
base.extend(ClassMethods)
end
@ -14,6 +17,11 @@ module AbfWorker::ModelHelper
port: APP_CONFIG['abf_worker']['log_server']['port']
)
end
def self.next_build
raise NotImplementedError
end
end
def abf_worker_log
@ -21,6 +29,7 @@ module AbfWorker::ModelHelper
end
def add_job_to_abf_worker_queue
update_build_sets
Resque.push(
worker_queue_with_priority,
'class' => worker_queue_class,
@ -29,6 +38,7 @@ module AbfWorker::ModelHelper
end
def restart_job
update_build_sets
Resque.redis.lpush "queue:#{worker_queue_with_priority}",
Resque.encode({'class' => worker_queue_class, 'args' => [abf_worker_args]})
end
@ -50,19 +60,40 @@ module AbfWorker::ModelHelper
)
end
def worker_queue_with_priority(queue = nil)
queue ||= abf_worker_base_queue
def worker_queue_with_priority(prefix = true)
queue = ''
if prefix && is_a?(BuildList)
if mass_build_id
queue << "mass_build_#{mass_build_id}_"
else
queue << "user_build_#{user_id}_"
end
end
queue << abf_worker_base_queue
queue << '_' << abf_worker_priority if abf_worker_priority.present?
queue
end
def worker_queue_class(queue_class = nil)
queue_class ||= "AbfWorker::#{abf_worker_base_queue.classify}"
queue_class << abf_worker_priority.capitalize
def worker_queue_class
"AbfWorker::#{abf_worker_base_queue.classify}#{abf_worker_priority.capitalize}"
end
private
def update_build_sets
return unless is_a?(BuildList)
key = mass_build_id ? MASS_BUILDS_SET : USER_BUILDS_SET
Resque.redis.pipelined do
Resque.redis.sadd key, mass_build_id || user_id
Resque.redis.sadd 'queues', worker_queue_with_priority
end
end
def send_stop_signal
Resque.redis.setex(
"#{service_queue}::live-inspector",

View File

@ -9,13 +9,23 @@ module AbfWorker
result[:rpm][:workers] += nodes[:systems]
result[:rpm][:build_tasks] += nodes[:busy]
result[:rpm][:other_workers] = nodes[:others]
external_bls = BuildList.for_status(BuildList::BUILD_PENDING).external_nodes(:everything).count
result[:rpm][:default_tasks] += external_bls
result[:rpm][:tasks] += external_bls
result[:rpm][:default_tasks] += external_bls + count_of_tasks('user_build_')
mass_build_tasks = count_of_tasks('mass_build_')
result[:rpm][:low_tasks] += mass_build_tasks
result[:rpm][:tasks] += external_bls + mass_build_tasks
result
end
end
def count_of_tasks(regexp)
Resque.redis.smembers('queues').
select{ |q| q =~ /#{regexp}/ }.
map{ |q| Resque.redis.llen("queue:#{q}") }.sum
end
def products_status
get_status(:iso) { |w, worker|
str = w.to_s

View File

@ -68,7 +68,7 @@ Capistrano::Configuration.instance(:must_exist).load do
def start_scheduler
pid = "#{fetch :current_path}/tmp/pids/scheduler.pid"
run "cd #{fetch :current_path} && #{rails_env} PIDFILE=#{pid} BACKGROUND=yes VERBOSE=1 MUTE=1 bundle exec rake resque:scheduler"
run "cd #{fetch :current_path} && #{rails_env} PIDFILE=#{pid} BACKGROUND=yes VERBOSE=1 MUTE=1 RESQUE_SCHEDULER_INTERVAL=0.5 bundle exec rake resque:scheduler"
end
def stop_scheduler

View File

@ -7,4 +7,17 @@ namespace :resque do
end
system("kill -QUIT #{pids.join(' ')}") if pids.size > 0
end
# This fixes connection fail with Postgres server on new fork:
task setup: :environment do
Resque.after_fork do
Resque.redis.client.reconnect
end
Resque.before_fork = Proc.new { ActiveRecord::Base.establish_connection }
end
task scheduler_setup: :environment do
Resque.schedule = YAML.load_file(Rails.root.join('config', 'resque_schedule.yml'))
end
end