#403: move abf_worker/*_observer to app/jobs/abf_worker/

This commit is contained in:
Vokhmin Alexey V 2014-06-20 22:02:32 +04:00
parent 8b90aec335
commit 4a40764173
11 changed files with 68 additions and 75 deletions

View File

@ -78,7 +78,7 @@ class Platforms::ProductBuildListsController < Platforms::BaseController
@product_build_lists = @product_build_lists.for_status(params[:status]) if params[:status].present?
end
@product_build_lists = @product_build_lists.recent.paginate page: params[:page]
@build_server_status = AbfWorker::StatusInspector.products_status
@build_server_status = AbfWorkerStatusPresenter.new.products_status
end
protected

View File

@ -38,7 +38,7 @@ class Projects::BuildListsController < Projects::BaseController
project: :project_statistics
)
@build_server_status = AbfWorker::StatusInspector.projects_status
@build_server_status = AbfWorkerStatusPresenter.new.projects_status
end
end
end

View File

@ -1,7 +1,7 @@
class BuildList < ActiveRecord::Base
include CommitAndVersion
include FileStoreClean
include AbfWorker::ModelHelper
include AbfWorkerMethods
include Feed::BuildList
include BuildListObserver
include EventLoggable

View File

@ -1,15 +1,9 @@
module AbfWorker::ModelHelper
# In model which contains this helper should be:
# - #abf_worker_args
# - #build_canceled
module AbfWorkerMethods
extend ActiveSupport::Concern
MASS_BUILDS_SET = 'abf-worker::mass-builds'
USER_BUILDS_SET = 'abf-worker::user-builds'
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def log_server
@log_server ||= Redis.new(
@ -18,10 +12,9 @@ module AbfWorker::ModelHelper
)
end
def self.next_build
def next_build
raise NotImplementedError
end
end
def abf_worker_log
@ -106,4 +99,5 @@ module AbfWorker::ModelHelper
"abfworker::#{abf_worker_base_queue.gsub(/\_/, '-')}-#{id}"
end
end

View File

@ -3,7 +3,7 @@ class ProductBuildList < ActiveRecord::Base
include TimeLiving
include FileStoreClean
include UrlHelper
include AbfWorker::ModelHelper
include AbfWorkerMethods
include EventLoggable
delegate :url_helpers, to: 'Rails.application.routes'

View File

@ -0,0 +1,59 @@
class AbfWorkerStatusPresenter < ApplicationPresenter
def initialize
end
def projects_status
Rails.cache.fetch([AbfWorkerStatusPresenter, :projects_status], expires_in: 10.seconds) do
result = get_status(:rpm, :publish) { |w, worker| w.to_s =~ /#{worker}_worker_default/ }
nodes = RpmBuildNode.total_statistics
result[:rpm][:workers] += nodes[:systems]
result[:rpm][:build_tasks] += nodes[:busy]
result[:rpm][:other_workers] = nodes[:others]
external_bls = BuildList.for_status(BuildList::BUILD_PENDING).external_nodes(:everything).count
result[:rpm][:default_tasks] += external_bls + count_of_tasks('user_build_')
mass_build_tasks = count_of_tasks('mass_build_')
result[:rpm][:low_tasks] += mass_build_tasks
result[:rpm][:tasks] += external_bls + mass_build_tasks
result
end
end
def products_status
get_status(:iso) { |w, worker|
str = w.to_s
str =~ /iso_worker/ && str !~ /observer/
}
end
protected
def count_of_tasks(regexp)
Redis.current.smembers('resque:queues').
select{ |q| q =~ /#{regexp}/ }.
map{ |q| Redis.current.llen("resque:queue:#{q}") }.sum
end
def get_status(*queues)
status = {}
queues.each do |worker|
workers = Resque.workers.select{ |w| yield w, worker }
status[worker] = status_of_worker workers, worker
end
status
end
def status_of_worker(workers, worker)
key = "resque:queue:#{worker}_worker"
default_tasks, tasks = Redis.current.llen("#{key}_default"), Redis.current.llen(key)
{
workers: workers.count,
build_tasks: workers.select{ |w| w.working? }.count,
default_tasks: default_tasks,
low_tasks: tasks,
tasks: (default_tasks + tasks)
}
end
end

View File

@ -1,60 +0,0 @@
module AbfWorker
class StatusInspector
class << self
def projects_status
Rails.cache.fetch([AbfWorker::StatusInspector, :projects_status], expires_in: 10.seconds) do
result = get_status(:rpm, :publish) { |w, worker| w.to_s =~ /#{worker}_worker_default/ }
nodes = RpmBuildNode.total_statistics
result[:rpm][:workers] += nodes[:systems]
result[:rpm][:build_tasks] += nodes[:busy]
result[:rpm][:other_workers] = nodes[:others]
external_bls = BuildList.for_status(BuildList::BUILD_PENDING).external_nodes(:everything).count
result[:rpm][:default_tasks] += external_bls + count_of_tasks('user_build_')
mass_build_tasks = count_of_tasks('mass_build_')
result[:rpm][:low_tasks] += mass_build_tasks
result[:rpm][:tasks] += external_bls + mass_build_tasks
result
end
end
def count_of_tasks(regexp)
Redis.current.smembers('resque:queues').
select{ |q| q =~ /#{regexp}/ }.
map{ |q| Redis.current.llen("resque:queue:#{q}") }.sum
end
def products_status
get_status(:iso) { |w, worker|
str = w.to_s
str =~ /iso_worker/ && str !~ /observer/
}
end
protected
def get_status(*queues)
status = {}
queues.each do |worker|
workers = Resque.workers.select{ |w| yield w, worker }
status[worker] = status_of_worker workers, worker
end
status
end
def status_of_worker(workers, worker)
key = "resque:queue:#{worker}_worker"
default_tasks, tasks = Redis.current.llen("#{key}_default"), Redis.current.llen(key)
{
workers: workers.count,
build_tasks: workers.select{ |w| w.working? }.count,
default_tasks: default_tasks,
low_tasks: tasks,
tasks: (default_tasks + tasks)
}
end
end
end
end