#403: updated structure of jobs

This commit is contained in:
Vokhmin Alexey V 2014-06-19 00:40:22 +04:00
parent 32c03780ba
commit c37cf206ab
18 changed files with 316 additions and 272 deletions

View File

@ -59,7 +59,7 @@ class Projects::BuildListsController < Projects::BaseController
if build_list.save_to_platform.personal?
raise CanCan::AccessDenied
else
Resque.enqueue(RunBuildListsJob, build_list.id, current_user.id, params[:project_id])
Resque.enqueue(BuildLists::DependentPackagesJob, build_list.id, current_user.id, params[:project_id])
flash[:notice] = t('flash.build_list.run_build_lists_job_added_to_queue')
redirect_to build_list_path(build_list)

View File

@ -0,0 +1,55 @@
module BuildLists
class CreateContainerJob
@queue = :middle
include AbfWorkerHelper
def self.perform(build_list_id)
build_list = BuildList.find(build_list_id)
platform_path = "#{build_list.save_to_platform.path}/container/#{build_list.id}"
system "rm -rf #{platform_path} && mkdir -p #{platform_path}"
packages = packages_structure
packages[:sources] = build_list.packages.by_package_type('source').pluck(:sha1).compact
packages[:binaries][build_list.arch.name.to_sym] = build_list.packages.by_package_type('binary').pluck(:sha1).compact
distrib_type = build_list.build_for_platform.distrib_type
cmd_params = {
'RELEASED' => false,
'REPOSITORY_NAME' => build_list.save_to_repository.name,
'TYPE' => distrib_type,
'IS_CONTAINER' => true,
'ID' => build_list.id,
'SAVE_TO_PLATFORM' => build_list.save_to_platform.name,
'BUILD_FOR_PLATFORM' => build_list.build_for_platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
# Low priority
Resque.push(
'publish_worker',
'class' => 'AbfWorker::PublishWorker',
'args' => [{
id: build_list.id,
cmd_params: cmd_params,
main_script: 'build.sh',
rollback_script: 'rollback.sh',
platform: {
platform_path: platform_path,
type: distrib_type,
name: build_list.build_for_platform.name,
arch: build_list.arch.name
},
repository: {id: build_list.save_to_repository_id},
time_living: 9600, # 160 min
packages: packages,
old_packages: packages_structure,
build_list_ids: [build_list.id],
projects_for_cleanup: [],
extra: {create_container: true}
}]
)
end
end
end

View File

@ -0,0 +1,66 @@
module BuildLists
class DependentPackagesJob
@queue = :middle
def self.perform(build_list_id, user_id, project_id = nil)
build_list = BuildList.find(build_list_id)
return if build_list.save_to_platform.personal?
user = User.find(user_id)
ability = Ability.new(user)
return unless ability.can?(:show, build_list)
project = Project.find(project_id) if project_id.present?
return if project && !ability.can?(:write, project)
dependent_packages = build_list.packages.pluck(:dependent_packages).flatten.uniq
project_ids = BuildList::Package.
joins(:build_list).
where(
platform_id: build_list.save_to_platform,
name: dependent_packages,
build_lists: { status: BuildList::BUILD_PUBLISHED }
).reorder(nil).uniq.pluck(:project_id)
return if project && project_ids.exclude?(project.id)
projects = project ? [project] : Project.where(id: project_ids).to_a
projects.each do |project|
next unless ability.can?(:write, project)
build_for_platform = save_to_platform = build_list.build_for_platform
save_to_repository = save_to_platform.repositories.find{ |r| r.projects.exists?(project.id) }
next unless save_to_repository
project_version = project.project_version_for(save_to_platform, build_for_platform)
project.increase_release_tag(project_version, user, "BuildList##{build_list.id}: Increase release tag")
bl = project.build_lists.build
bl.save_to_repository = save_to_repository
bl.priority = user.build_priority # User builds more priority than mass rebuild with zero priority
bl.project_version = project_version
bl.user = user
bl.include_repos = [build_for_platform.repositories.main.first.try(:id)].compact
bl.include_repos |= [save_to_repository.id]
%i(
build_for_platform
arch
update_type
save_to_platform
auto_create_container
extra_build_lists
extra_params
external_nodes
include_testing_subrepository
auto_publish_status
use_cached_chroot
use_extra_tests
group_id
).each { |field| bl.send("#{field}=", build_list.send(field)) }
ability.can?(:create, bl) && bl.save
end
end
end
end

View File

@ -0,0 +1,10 @@
module BuildLists
class PublishTaskManagerJob
@queue = :middle
def self.perform
AbfWorker::BuildListsPublishTaskManager.new.run
end
end
end

View File

@ -0,0 +1,47 @@
module BuildLists
class QueuesMonitoringJob
@queue = :middle
def self.perform
Redis.current.smembers('resque:queues').each do |key|
next if key !~ /(user|mass)_build_/
queue = "resque:queue:#{key}"
id = key.gsub(/[^\d]/, '')
if Redis.current.llen(queue) == 0
if key =~ /^user/
last_updated_at = BuildList.select(:updated_at).
where(user_id: id).order('updated_at DESC').first
else
last_updated_at = MassBuild.select(:updated_at).where(id: 250).first
end
last_updated_at = last_updated_at.try(:updated_at)
# cleans queue if no activity and tasks for this queue
clean(key) if !last_updated_at || (last_updated_at + 5.minutes) < Time.zone.now
else
# ensures that user/mass-build in the set from which we select next jobs
set_key = key =~ /^user/ ? BuildList::USER_BUILDS_SET : BuildList::MASS_BUILDS_SET
Redis.current.sadd set_key, id
end
end
end
def self.clean(key)
queue = "resque:queue:#{key}"
# See [#watch]: https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2012
Redis.current.watch(queue) do
if Redis.current.llen(queue) == 0
Redis.current.multi do |multi|
multi.del queue
multi.srem 'resque:queues', key
end
else
Redis.current.unwatch
end
end
end
end
end

View File

@ -1,8 +0,0 @@
class BuildListsPublishTaskManagerJob
@queue = :middle
def self.perform
AbfWorker::BuildListsPublishTaskManager.new.run
end
end

View File

@ -1,45 +0,0 @@
class BuildListsQueuesMonitoringJob
@queue = :middle
def self.perform
Redis.current.smembers('resque:queues').each do |key|
next if key !~ /(user|mass)_build_/
queue = "resque:queue:#{key}"
id = key.gsub(/[^\d]/, '')
if Redis.current.llen(queue) == 0
if key =~ /^user/
last_updated_at = BuildList.select(:updated_at).
where(user_id: id).order('updated_at DESC').first
else
last_updated_at = MassBuild.select(:updated_at).where(id: 250).first
end
last_updated_at = last_updated_at.try(:updated_at)
# cleans queue if no activity and tasks for this queue
clean(key) if !last_updated_at || (last_updated_at + 5.minutes) < Time.zone.now
else
# ensures that user/mass-build in the set from which we select next jobs
set_key = key =~ /^user/ ? BuildList::USER_BUILDS_SET : BuildList::MASS_BUILDS_SET
Redis.current.sadd set_key, id
end
end
end
def self.clean(key)
queue = "resque:queue:#{key}"
# See [#watch]: https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2012
Redis.current.watch(queue) do
if Redis.current.llen(queue) == 0
Redis.current.multi do |multi|
multi.del queue
multi.srem 'resque:queues', key
end
else
Redis.current.unwatch
end
end
end
end

View File

@ -0,0 +1,62 @@
module AbfWorkerHelper
REDIS_MAIN_KEY = 'abf-worker::build-lists-publish-task-manager::'
%w(
PROJECTS_FOR_CLEANUP
LOCKED_PROJECTS_FOR_CLEANUP
LOCKED_BUILD_LISTS
PACKAGES_FOR_CLEANUP
REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING
BUILD_LISTS_FOR_CLEANUP_FROM_TESTING
).each do |kind|
const_set kind, "#{REDIS_MAIN_KEY}#{kind.downcase.gsub('_', '-')}"
end
extend ActiveSupport::Concern
def self.packages_structure
structure = {sources: [], binaries: {}}
Arch.pluck(:name).each{ |name| structure[:binaries][name.to_sym] = [] }
structure
end
def self.fill_packages(bl, results_map, field = :sha1)
results_map[:sources] |= bl.packages.by_package_type('source').pluck(field).compact if field != :sha1
binaries = bl.packages.by_package_type('binary').pluck(field).compact
arch = bl.arch.name.to_sym
results_map[:binaries][arch] |= binaries
# Publish/remove i686 RHEL packages into/from x86_64
if arch == :i586 && bl.build_for_platform.distrib_type == 'rhel' && bl.project.publish_i686_into_x86_64?
results_map[:binaries][:x86_64] |= binaries
end
end
def self.cleanup_completed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.hdel PACKAGES_FOR_CLEANUP, key
end
end
def self.cleanup_failed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
end
end
def self.cleanup_packages_from_testing(platform_id, repository_id, *build_lists)
return if build_lists.blank?
rep_pl = "#{repository_id}-#{platform_id}"
key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}"
Redis.current.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl
Redis.current.sadd key, build_lists
end
def self.unlock_build_list(build_list)
Redis.current.lrem LOCKED_BUILD_LISTS, 0, build_list.id
end
end

View File

@ -0,0 +1,53 @@
class DestroyProjectFromRepositoryJob
@queue = :middle
include AbfWorkerHelper
def self.perform(project_id, repository_id)
project = Project.find(project_id)
repository = Repository.find(repository_id)
if repository.platform.personal?
Platform.main.each do |main_platform|
key = "#{project.id}-#{repository.id}-#{main_platform.id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, main_platform.id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, main_platform.id, true
end
else
key = "#{project.id}-#{repository.id}-#{repository.platform.id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, repository.platform.id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, repository.platform.id, true
end
end
def self.gather_old_packages(project_id, repository_id, platform_id, testing = false)
build_lists_for_cleanup = []
status = testing ? BuildList::BUILD_PUBLISHED_INTO_TESTING : BuildList::BUILD_PUBLISHED
Arch.pluck(:id).each do |arch_id|
bl = BuildList.where(project_id: project_id).
where(new_core: true, status: status).
where(save_to_repository_id: repository_id).
where(build_for_platform_id: platform_id).
where(arch_id: arch_id).
order(:updated_at).first
build_lists_for_cleanup << bl if bl
end
old_packages = packages_structure
build_lists_for_cleanup.each do |bl|
bl.last_published(testing).includes(:packages).limit(2).each{ |old_bl|
fill_packages(old_bl, old_packages, :fullname)
}
end
key = (testing ? 'testing-' : '') << "#{project_id}-#{repository_id}-#{platform_id}"
Redis.current.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json
end
end

View File

@ -1,64 +0,0 @@
class RunBuildListsJob
@queue = :middle
def self.perform(build_list_id, user_id, project_id = nil)
build_list = BuildList.find(build_list_id)
return if build_list.save_to_platform.personal?
user = User.find(user_id)
ability = Ability.new(user)
return unless ability.can?(:show, build_list)
project = Project.find(project_id) if project_id.present?
return if project && !ability.can?(:write, project)
dependent_packages = build_list.packages.pluck(:dependent_packages).flatten.uniq
project_ids = BuildList::Package.
joins(:build_list).
where(
platform_id: build_list.save_to_platform,
name: dependent_packages,
build_lists: { status: BuildList::BUILD_PUBLISHED }
).reorder(nil).uniq.pluck(:project_id)
return if project && project_ids.exclude?(project.id)
projects = project ? [project] : Project.where(id: project_ids).to_a
projects.each do |project|
next unless ability.can?(:write, project)
build_for_platform = save_to_platform = build_list.build_for_platform
save_to_repository = save_to_platform.repositories.find{ |r| r.projects.exists?(project.id) }
next unless save_to_repository
project_version = project.project_version_for(save_to_platform, build_for_platform)
project.increase_release_tag(project_version, user, "BuildList##{build_list.id}: Increase release tag")
bl = project.build_lists.build
bl.save_to_repository = save_to_repository
bl.priority = user.build_priority # User builds more priority than mass rebuild with zero priority
bl.project_version = project_version
bl.user = user
bl.include_repos = [build_for_platform.repositories.main.first.try(:id)].compact
bl.include_repos |= [save_to_repository.id]
%i(
build_for_platform
arch
update_type
save_to_platform
auto_create_container
extra_build_lists
extra_params
external_nodes
include_testing_subrepository
auto_publish_status
use_cached_chroot
use_extra_tests
group_id
).each { |field| bl.send("#{field}=", build_list.send(field)) }
ability.can?(:create, bl) && bl.save
end
end
end

View File

@ -578,7 +578,7 @@ class BuildList < ActiveRecord::Base
end
def cleanup_packages_from_testing
AbfWorker::BuildListsPublishTaskManager.cleanup_packages_from_testing(
AbfWorkerHelper.cleanup_packages_from_testing(
build_for_platform_id,
save_to_repository_id,
id
@ -623,7 +623,7 @@ class BuildList < ActiveRecord::Base
protected
def create_container
AbfWorker::BuildListsPublishTaskManager.create_container_for self
Resque.enqueue(BuildLists::CreateContainerJob, id)
end
def remove_container

View File

@ -224,10 +224,6 @@ class Project < ActiveRecord::Base
end
end
def destroy_project_from_repository(repository)
AbfWorker::BuildListsPublishTaskManager.destroy_project_from_repository self, repository
end
def default_head treeish = nil # maybe need change 'head'?
# Attention!
# repo.commit(nil) => <Grit::Commit "b6c0f81deb17590d22fc07ba0bbd4aa700256f61">

View File

@ -8,7 +8,7 @@ class ProjectToRepository < ActiveRecord::Base
scope :autostart_enabled, -> { where("autostart_options -> 'enabled' = 'true'") }
after_destroy -> { project.destroy_project_from_repository(repository) }, unless: -> { Thread.current[:skip] }
after_destroy :destroy_project_from_repository, unless: -> { Thread.current[:skip] }
validate :one_project_in_platform_repositories, on: :create
@ -28,6 +28,10 @@ class ProjectToRepository < ActiveRecord::Base
protected
def destroy_project_from_repository
Resque.enqueue(DestroyProjectFromRepositoryJob, project_id, repository_id)
end
def one_project_in_platform_repositories
if Project.joins(repositories: :platform).where('platforms.id = ?', repository.platform_id).by_name(project.name).exists?
errors.add(:base, I18n.t('activerecord.errors.project_to_repository.project'))

View File

@ -32,6 +32,7 @@ module Rosa
# config.autoload_paths += %W(#{config.root}/extras)
config.autoload_paths += %W(#{config.root}/app/presenters)
config.autoload_paths += %W(#{config.root}/app/jobs)
config.autoload_paths += %W(#{config.root}/app/jobs/concerns)
# Only load the plugins named here, in the order given (default is alphabetical).
# :all can be used as a placeholder for all plugins not explicitly named.

View File

@ -8,14 +8,14 @@ clean_rpm_build_nodes:
build_lists_publish_task_manager:
every:
- '3m'
class: 'BuildListsPublishTaskManagerJob'
class: 'BuildLists::PublishTaskManagerJob'
queue: middle
description: 'Creates tasks for publishing'
build_lists_queues_monitoring:
every:
- '1m'
class: 'BuildListsQueuesMonitoringJob'
class: 'BuildLists::QueuesMonitoringJob'
queue: middle
description: 'Monitoring for "user/mass-build" queues'

View File

@ -23,141 +23,6 @@ module AbfWorker
create_tasks_for_build_rpms true
end
class << self
def destroy_project_from_repository(project, repository)
if repository.platform.personal?
Platform.main.each do |main_platform|
key = "#{project.id}-#{repository.id}-#{main_platform.id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, main_platform.id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, main_platform.id, true
end
else
key = "#{project.id}-#{repository.id}-#{repository.platform.id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, repository.platform.id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, repository.platform.id, true
end
end
def cleanup_completed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.hdel PACKAGES_FOR_CLEANUP, key
end
end
def cleanup_failed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
end
end
def cleanup_packages_from_testing(platform_id, repository_id, *build_lists)
return if build_lists.blank?
rep_pl = "#{repository_id}-#{platform_id}"
key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}"
Redis.current.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl
Redis.current.sadd key, build_lists
end
def unlock_build_list(build_list)
Redis.current.lrem LOCKED_BUILD_LISTS, 0, build_list.id
end
def packages_structure
structure = {sources: [], binaries: {}}
Arch.pluck(:name).each{ |name| structure[:binaries][name.to_sym] = [] }
structure
end
def create_container_for(build_list)
platform_path = "#{build_list.save_to_platform.path}/container/#{build_list.id}"
system "rm -rf #{platform_path} && mkdir -p #{platform_path}"
packages = packages_structure
packages[:sources] = build_list.packages.by_package_type('source').pluck(:sha1).compact
packages[:binaries][build_list.arch.name.to_sym] = build_list.packages.by_package_type('binary').pluck(:sha1).compact
distrib_type = build_list.build_for_platform.distrib_type
cmd_params = {
'RELEASED' => false,
'REPOSITORY_NAME' => build_list.save_to_repository.name,
'TYPE' => distrib_type,
'IS_CONTAINER' => true,
'ID' => build_list.id,
'SAVE_TO_PLATFORM' => build_list.save_to_platform.name,
'BUILD_FOR_PLATFORM' => build_list.build_for_platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
# Low priority
Resque.push(
'publish_worker',
'class' => 'AbfWorker::PublishWorker',
'args' => [{
id: build_list.id,
cmd_params: cmd_params,
main_script: 'build.sh',
rollback_script: 'rollback.sh',
platform: {
platform_path: platform_path,
type: distrib_type,
name: build_list.build_for_platform.name,
arch: build_list.arch.name
},
repository: {id: build_list.save_to_repository_id},
time_living: 9600, # 160 min
packages: packages,
old_packages: packages_structure,
build_list_ids: [build_list.id],
projects_for_cleanup: [],
extra: {create_container: true}
}]
)
end
def gather_old_packages(project_id, repository_id, platform_id, testing = false)
build_lists_for_cleanup = []
status = testing ? BuildList::BUILD_PUBLISHED_INTO_TESTING : BuildList::BUILD_PUBLISHED
Arch.pluck(:id).each do |arch_id|
bl = BuildList.where(project_id: project_id).
where(new_core: true, status: status).
where(save_to_repository_id: repository_id).
where(build_for_platform_id: platform_id).
where(arch_id: arch_id).
order(:updated_at).first
build_lists_for_cleanup << bl if bl
end
old_packages = packages_structure
build_lists_for_cleanup.each do |bl|
bl.last_published(testing).includes(:packages).limit(2).each{ |old_bl|
fill_packages(old_bl, old_packages, :fullname)
}
end
key = (testing ? 'testing-' : '') << "#{project_id}-#{repository_id}-#{platform_id}"
Redis.current.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json
end
def fill_packages(bl, results_map, field = :sha1)
results_map[:sources] |= bl.packages.by_package_type('source').pluck(field).compact if field != :sha1
binaries = bl.packages.by_package_type('binary').pluck(field).compact
arch = bl.arch.name.to_sym
results_map[:binaries][arch] |= binaries
# Publish/remove i686 RHEL packages into/from x86_64
if arch == :i586 && bl.build_for_platform.distrib_type == 'rhel' && bl.project.publish_i686_into_x86_64?
results_map[:binaries][:x86_64] |= binaries
end
end
end
private
def create_tasks_for_resign_repositories

View File

@ -38,7 +38,7 @@ module AbfWorker
elsif !extra['resign'] # Simple publish
bls = extra['build_lists_for_cleanup_from_testing']
if status != COMPLETED && bls.present?
AbfWorker::BuildListsPublishTaskManager.cleanup_packages_from_testing(
AbfWorkerHelper.cleanup_packages_from_testing(
repository_status.platform_id,
repository_status.repository_id,
bls
@ -72,14 +72,14 @@ module AbfWorker
build_list.fail_publish_into_testing || build_list.update_column(:status, BuildList::FAILED_PUBLISH_INTO_TESTING)
end
end
AbfWorker::BuildListsPublishTaskManager.unlock_build_list build_list
AbfWorkerHelper.unlock_build_list build_list
end
case status
when COMPLETED
AbfWorker::BuildListsPublishTaskManager.cleanup_completed options['projects_for_cleanup']
AbfWorkerHelper.cleanup_completed options['projects_for_cleanup']
when FAILED, CANCELED
AbfWorker::BuildListsPublishTaskManager.cleanup_failed options['projects_for_cleanup']
AbfWorkerHelper.cleanup_failed options['projects_for_cleanup']
end
end

View File

@ -1,6 +1,6 @@
require 'spec_helper'
describe RunBuildListsJob do
describe BuildLists::DependentPackagesJob do
let(:build_list) { FactoryGirl.build(:build_list, id: 123) }
let(:user) { build_list.user }
let(:project) { build_list.project }
@ -18,36 +18,38 @@ describe RunBuildListsJob do
allow(ability).to receive(:can?).with(:create, anything).and_return(true)
end
subject { BuildLists::DependentPackagesJob }
it 'ensures that not raises error' do
expect do
RunBuildListsJob.perform build_list.id, user.id
subject.perform build_list.id, user.id
end.to_not raise_exception
end
it 'ensures that creates build_list' do
expect do
RunBuildListsJob.perform build_list.id, user.id
subject.perform build_list.id, user.id
end.to change(BuildList, :count).by(1)
end
it 'ensures that do nothing if user has no access for show of build_list' do
allow(ability).to receive(:can?).with(:show, build_list).and_return(false)
expect do
RunBuildListsJob.perform build_list.id, user.id
subject.perform build_list.id, user.id
end.to change(BuildList, :count).by(0)
end
it 'ensures that do nothing if user has no access for write of project' do
allow(ability).to receive(:can?).with(:write, project).and_return(false)
expect do
RunBuildListsJob.perform build_list.id, user.id
subject.perform build_list.id, user.id
end.to change(BuildList, :count).by(0)
end
it 'ensures that do nothing if user has no access for create of build_list' do
allow(ability).to receive(:can?).with(:create, anything).and_return(false)
expect do
RunBuildListsJob.perform build_list.id, user.id
subject.perform build_list.id, user.id
end.to change(BuildList, :count).by(0)
end