#403: added abf_worker_service, specs

This commit is contained in:
Vokhmin Alexey V 2014-06-19 23:35:53 +04:00
parent c37cf206ab
commit 9d40bfc291
21 changed files with 743 additions and 405 deletions

View File

@ -2,53 +2,10 @@ module BuildLists
class CreateContainerJob class CreateContainerJob
@queue = :middle @queue = :middle
include AbfWorkerHelper
def self.perform(build_list_id) def self.perform(build_list_id)
build_list = BuildList.find(build_list_id) build_list = BuildList.find(build_list_id)
container = AbfWorkerService::Container.new(build_list)
platform_path = "#{build_list.save_to_platform.path}/container/#{build_list.id}" container.create!
system "rm -rf #{platform_path} && mkdir -p #{platform_path}"
packages = packages_structure
packages[:sources] = build_list.packages.by_package_type('source').pluck(:sha1).compact
packages[:binaries][build_list.arch.name.to_sym] = build_list.packages.by_package_type('binary').pluck(:sha1).compact
distrib_type = build_list.build_for_platform.distrib_type
cmd_params = {
'RELEASED' => false,
'REPOSITORY_NAME' => build_list.save_to_repository.name,
'TYPE' => distrib_type,
'IS_CONTAINER' => true,
'ID' => build_list.id,
'SAVE_TO_PLATFORM' => build_list.save_to_platform.name,
'BUILD_FOR_PLATFORM' => build_list.build_for_platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
# Low priority
Resque.push(
'publish_worker',
'class' => 'AbfWorker::PublishWorker',
'args' => [{
id: build_list.id,
cmd_params: cmd_params,
main_script: 'build.sh',
rollback_script: 'rollback.sh',
platform: {
platform_path: platform_path,
type: distrib_type,
name: build_list.build_for_platform.name,
arch: build_list.arch.name
},
repository: {id: build_list.save_to_repository_id},
time_living: 9600, # 160 min
packages: packages,
old_packages: packages_structure,
build_list_ids: [build_list.id],
projects_for_cleanup: [],
extra: {create_container: true}
}]
)
end end
end end

View File

@ -1,10 +0,0 @@
module BuildLists
class PublishTaskManagerJob
@queue = :middle
def self.perform
AbfWorker::BuildListsPublishTaskManager.new.run
end
end
end

View File

@ -1,62 +0,0 @@
module AbfWorkerHelper
REDIS_MAIN_KEY = 'abf-worker::build-lists-publish-task-manager::'
%w(
PROJECTS_FOR_CLEANUP
LOCKED_PROJECTS_FOR_CLEANUP
LOCKED_BUILD_LISTS
PACKAGES_FOR_CLEANUP
REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING
BUILD_LISTS_FOR_CLEANUP_FROM_TESTING
).each do |kind|
const_set kind, "#{REDIS_MAIN_KEY}#{kind.downcase.gsub('_', '-')}"
end
extend ActiveSupport::Concern
def self.packages_structure
structure = {sources: [], binaries: {}}
Arch.pluck(:name).each{ |name| structure[:binaries][name.to_sym] = [] }
structure
end
def self.fill_packages(bl, results_map, field = :sha1)
results_map[:sources] |= bl.packages.by_package_type('source').pluck(field).compact if field != :sha1
binaries = bl.packages.by_package_type('binary').pluck(field).compact
arch = bl.arch.name.to_sym
results_map[:binaries][arch] |= binaries
# Publish/remove i686 RHEL packages into/from x86_64
if arch == :i586 && bl.build_for_platform.distrib_type == 'rhel' && bl.project.publish_i686_into_x86_64?
results_map[:binaries][:x86_64] |= binaries
end
end
def self.cleanup_completed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.hdel PACKAGES_FOR_CLEANUP, key
end
end
def self.cleanup_failed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
end
end
def self.cleanup_packages_from_testing(platform_id, repository_id, *build_lists)
return if build_lists.blank?
rep_pl = "#{repository_id}-#{platform_id}"
key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}"
Redis.current.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl
Redis.current.sadd key, build_lists
end
def self.unlock_build_list(build_list)
Redis.current.lrem LOCKED_BUILD_LISTS, 0, build_list.id
end
end

View File

@ -1,53 +1,12 @@
class DestroyProjectFromRepositoryJob class DestroyProjectFromRepositoryJob
@queue = :middle @queue = :middle
include AbfWorkerHelper
def self.perform(project_id, repository_id) def self.perform(project_id, repository_id)
project = Project.find(project_id) project = Project.find(project_id)
repository = Repository.find(repository_id) repository = Repository.find(repository_id)
if repository.platform.personal? service = AbfWorkerService::Repository.new(repository)
Platform.main.each do |main_platform| service.destroy_project!(project)
key = "#{project.id}-#{repository.id}-#{main_platform.id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, main_platform.id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, main_platform.id, true
end
else
key = "#{project.id}-#{repository.id}-#{repository.platform.id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, repository.platform.id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, repository.platform.id, true
end
end
def self.gather_old_packages(project_id, repository_id, platform_id, testing = false)
build_lists_for_cleanup = []
status = testing ? BuildList::BUILD_PUBLISHED_INTO_TESTING : BuildList::BUILD_PUBLISHED
Arch.pluck(:id).each do |arch_id|
bl = BuildList.where(project_id: project_id).
where(new_core: true, status: status).
where(save_to_repository_id: repository_id).
where(build_for_platform_id: platform_id).
where(arch_id: arch_id).
order(:updated_at).first
build_lists_for_cleanup << bl if bl
end
old_packages = packages_structure
build_lists_for_cleanup.each do |bl|
bl.last_published(testing).includes(:packages).limit(2).each{ |old_bl|
fill_packages(old_bl, old_packages, :fullname)
}
end
key = (testing ? 'testing-' : '') << "#{project_id}-#{repository_id}-#{platform_id}"
Redis.current.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json
end end
end end

View File

@ -0,0 +1,38 @@
class PublishTaskManagerJob
@queue = :middle
def self.perform
regenerate_metadata_for_software_center
resign_repositories
regenerate_metadata
AbfWorkerService::Rpm.new.publish!
end
protected
def regenerate_metadata_for_software_center
Platform.main.waiting_for_regeneration.each do |platform|
AbfWorkerService::PlatformMetadata.new(platform).regenerate!
end
end
def resign_repositories
statuses = RepositoryStatus.platform_ready.
for_resign.includes(repository: :platform).readonly(false)
statuses.each do |repository_status|
AbfWorkerService::Repository.new(repository_status.repository).resign!(repository_status)
end
end
def regenerate_metadata
statuses = RepositoryStatus.platform_ready.
for_regeneration.includes(repository: :platform).readonly(false)
statuses.each do |repository_status|
AbfWorkerService::RepositoryMetadata.new(repository_status).regenerate!
end
end
end

View File

@ -578,7 +578,7 @@ class BuildList < ActiveRecord::Base
end end
def cleanup_packages_from_testing def cleanup_packages_from_testing
AbfWorkerHelper.cleanup_packages_from_testing( AbfWorkerService::Base.cleanup_packages_from_testing(
build_for_platform_id, build_for_platform_id,
save_to_repository_id, save_to_repository_id,
id id
@ -627,7 +627,7 @@ class BuildList < ActiveRecord::Base
end end
def remove_container def remove_container
system "rm -rf #{save_to_platform.path}/container/#{id}" if save_to_platform AbfWorkerService::Container.new(self).destroy! if save_to_platform
end end
def abf_worker_priority def abf_worker_priority

View File

@ -0,0 +1,64 @@
module AbfWorkerService
class Base
REDIS_MAIN_KEY = 'abf-worker::build-lists-publish-task-manager::'
%w(
PROJECTS_FOR_CLEANUP
LOCKED_PROJECTS_FOR_CLEANUP
LOCKED_BUILD_LISTS
PACKAGES_FOR_CLEANUP
REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING
BUILD_LISTS_FOR_CLEANUP_FROM_TESTING
).each do |kind|
const_set kind, "#{REDIS_MAIN_KEY}#{kind.downcase.gsub('_', '-')}"
end
def self.cleanup_completed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.hdel PACKAGES_FOR_CLEANUP, key
end
end
def self.cleanup_failed(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
end
end
def self.cleanup_packages_from_testing(platform_id, repository_id, *build_lists)
return if build_lists.blank?
rep_pl = "#{repository_id}-#{platform_id}"
key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}"
Redis.current.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl
Redis.current.sadd key, build_lists
end
def self.unlock_build_list(build_list)
Redis.current.lrem LOCKED_BUILD_LISTS, 0, build_list.id
end
protected
def packages_structure
structure = {sources: [], binaries: {}}
Arch.pluck(:name).each{ |name| structure[:binaries][name.to_sym] = [] }
structure
end
def fill_packages(bl, results_map, field = :sha1)
results_map[:sources] |= bl.packages.by_package_type('source').pluck(field).compact if field != :sha1
binaries = bl.packages.by_package_type('binary').pluck(field).compact
arch = bl.arch.name.to_sym
results_map[:binaries][arch] |= binaries
# Publish/remove i686 RHEL packages into/from x86_64
if arch == :i586 && bl.build_for_platform.distrib_type == 'rhel' && bl.project.publish_i686_into_x86_64?
results_map[:binaries][:x86_64] |= binaries
end
end
end
end

View File

@ -0,0 +1,75 @@
module AbfWorkerService
class Container < Base
attr_accessor :build_list
def initialize(build_list)
@build_list = build_list
end
def create!
cleanup_folder
Resque.push(
'publish_worker', # Low priority
'class' => 'AbfWorker::PublishWorker',
'args' => [{
id: build_list.id,
cmd_params: cmd_params,
main_script: 'build.sh',
rollback_script: 'rollback.sh',
platform: {
platform_path: platform_path,
type: distrib_type,
name: build_list.build_for_platform.name,
arch: build_list.arch.name
},
repository: {id: build_list.save_to_repository_id},
time_living: 9600, # 160 min
packages: packages,
old_packages: packages_structure,
build_list_ids: [build_list.id],
projects_for_cleanup: [],
extra: {create_container: true}
}]
)
end
def destroy!
system "rm -rf #{platform_path}"
end
protected
def cmd_params
{
'RELEASED' => false,
'REPOSITORY_NAME' => build_list.save_to_repository.name,
'TYPE' => distrib_type,
'IS_CONTAINER' => true,
'ID' => build_list.id,
'SAVE_TO_PLATFORM' => build_list.save_to_platform.name,
'BUILD_FOR_PLATFORM' => build_list.build_for_platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
end
def cleanup_folder
system "rm -rf #{platform_path} && mkdir -p #{platform_path}"
end
def platform_path
@platform_path ||= "#{build_list.save_to_platform.path}/container/#{build_list.id}"
end
def distrib_type
@distrib_type ||= build_list.build_for_platform.distrib_type
end
def packages
structure = packages_structure
structure[:sources] = build_list.packages.by_package_type('source').pluck(:sha1).compact
structure[:binaries][build_list.arch.name.to_sym] = build_list.packages.by_package_type('binary').pluck(:sha1).compact
structure
end
end
end

View File

@ -0,0 +1,59 @@
module AbfWorkerService
class PlatformMetadata < Base
attr_accessor :platform
def initialize(platform)
@platform = platform
end
def regenerate!
return unless can_regenerate?(platform)
Resque.push(
'publish_worker_default',
'class' => 'AbfWorker::PublishWorkerDefault',
'args' => [{
id: Time.now.to_i,
cmd_params: cmd_params(platform),
main_script: 'regenerate_platform_metadata.sh',
platform: {
platform_path: "#{platform.path}/repository",
type: platform.distrib_type,
name: platform.name,
arch: 'x86_64'
},
time_living: 9600, # 160 min
extra: {platform_id: platform.id, regenerate_platform: true}
}]
) if platform.start_regeneration
end
protected
def can_regenerate?
repos = platform.repositories
return false if repos.find{ |r| r.repo_lock_file_exists? }
statuses = RepositoryStatus.where(platform_id: platform.id)
return true if statuses.blank?
statuses = statuses.map do |s|
s.ready? || s.can_start_regeneration? || s.can_start_resign?
end.uniq
statuses == [true]
end
def cmd_params
{
'RELEASED' => platform.released,
'REPOSITORY_NAMES' => platform.repositories.map(&:name).join(','),
'TYPE' => platform.distrib_type,
'REGENERATE_PLATFORM_METADATA' => true,
'SAVE_TO_PLATFORM' => platform.name,
'BUILD_FOR_PLATFORM' => platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
end
end
end

View File

@ -0,0 +1,92 @@
module AbfWorkerService
class Repository < Base
attr_accessor :repository
def initialize(repository)
@repository = repository
end
def destroy_project!(project)
if repository.platform.personal?
Platform.main.each do |main_platform|
key = "#{project.id}-#{repository.id}-#{main_platform.id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, main_platform.id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, main_platform.id, true
end
else
key = "#{project.id}-#{repository.id}-#{repository.platform_id}"
Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, repository.platform_id
Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, repository.platform_id, true
end
end
def resign!(repository_status)
return if repository.repo_lock_file_exists?
Resque.push(
'publish_worker_default',
'class' => "AbfWorker::PublishWorkerDefault",
'args' => [{
id: repository.id,
cmd_params: cmd_params,
main_script: 'resign.sh',
platform: {
platform_path: "#{repository.platform.path}/repository",
type: distrib_type,
name: repository.platform.name,
arch: 'x86_64'
},
repository: {id: repository.id},
skip_feedback: true,
time_living: 9600, # 160 min
extra: {repository_status_id: repository_status.id, resign: true}
}]
) if repository_status.start_resign
end
protected
def cmd_params
{
'RELEASED' => repository.platform.released,
'REPOSITORY_NAME' => repository.name,
'TYPE' => distrib_type
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
end
def distrib_type
@distrib_type ||= repository.platform.distrib_type
end
def gather_old_packages(project_id, repository_id, platform_id, testing = false)
build_lists_for_cleanup = []
status = testing ? BuildList::BUILD_PUBLISHED_INTO_TESTING : BuildList::BUILD_PUBLISHED
Arch.pluck(:id).each do |arch_id|
bl = BuildList.where(project_id: project_id).
where(new_core: true, status: status).
where(save_to_repository_id: repository_id).
where(build_for_platform_id: platform_id).
where(arch_id: arch_id).
order(:updated_at).first
build_lists_for_cleanup << bl if bl
end
old_packages = packages_structure
build_lists_for_cleanup.each do |bl|
bl.last_published(testing).includes(:packages).limit(2).each do |old_bl|
fill_packages(old_bl, old_packages, :fullname)
end
end
key = (testing ? 'testing-' : '') << "#{project_id}-#{repository_id}-#{platform_id}"
Redis.current.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json
end
end
end

View File

@ -0,0 +1,62 @@
module AbfWorkerService
class RepositoryMetadata < Base
attr_accessor :repository, :repository_status
def initialize(repository_status)
@repository_status = repository_status
@repository = repository_status.repository
end
def regenerate!
# Checks mirror sync status
return if repository.repo_lock_file_exists?
platform_path = "#{repository.platform.path}/repository"
if repository.platform.personal?
platform_path << '/' << build_for_platform.name
system "mkdir -p #{platform_path}"
end
Resque.push(
'publish_worker_default',
'class' => 'AbfWorker::PublishWorkerDefault',
'args' => [{
id: Time.now.to_i,
cmd_params: cmd_params,
main_script: 'build.sh',
rollback_script: 'rollback.sh',
platform: {
platform_path: platform_path,
type: build_for_platform.distrib_type,
name: build_for_platform.name,
arch: 'x86_64'
},
repository: {id: repository.id},
# time_living: 9600, # 160 min
time_living: 14400, # 240 min
extra: {repository_status_id: repository_status.id, regenerate: true}
}]
) if repository_status.start_regeneration
end
protected
def build_for_platform
@build_for_platform ||= repository_status.platform
end
def cmd_params
{
'RELEASED' => repository.platform.released,
'REPOSITORY_NAME' => repository.name,
'TYPE' => build_for_platform.distrib_type,
'REGENERATE_METADATA' => true,
'SAVE_TO_PLATFORM' => repository.platform.name,
'BUILD_FOR_PLATFORM' => build_for_platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
end
end
end

View File

@ -0,0 +1,205 @@
module AbfWorkerService
class Rpm < Base
WORKERS_COUNT = APP_CONFIG['abf_worker']['publish_workers_count']
def publish!
build_rpms
build_rpms(true)
end
protected
def build_rpms(testing = false)
available_repos = BuildList.
select('MIN(updated_at) as min_updated_at, save_to_repository_id, build_for_platform_id').
where(new_core: true, status: (testing ? BuildList::BUILD_PUBLISH_INTO_TESTING : BuildList::BUILD_PUBLISH)).
group(:save_to_repository_id, :build_for_platform_id).
order('min_updated_at ASC').
limit(WORKERS_COUNT * 2) # because some repos may be locked
locked_rep = RepositoryStatus.not_ready.joins(:platform).
where(platforms: {platform_type: 'main'}).pluck(:repository_id)
available_repos = available_repos.where('save_to_repository_id NOT IN (?)', locked_rep) unless locked_rep.empty?
for_cleanup = Redis.current.lrange(PROJECTS_FOR_CLEANUP, 0, -1).map do |key|
next if testing && key !~ /^testing-/
rep, pl = *key.split('-').last(2)
locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i]
end.compact
for_cleanup_from_testing = Redis.current.smembers(REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING).map do |key|
next if Redis.current.scard("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{key}") == 0
rep, pl = *key.split('-')
locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i]
end.compact if testing
for_cleanup_from_testing ||= []
counter = 1
available_repos = available_repos.map{ |bl| [bl.save_to_repository_id, bl.build_for_platform_id] } | for_cleanup | for_cleanup_from_testing
available_repos.each do |save_to_repository_id, build_for_platform_id|
next if RepositoryStatus.not_ready.where(repository_id: save_to_repository_id, platform_id: build_for_platform_id).exists?
break if counter > WORKERS_COUNT
counter += 1 if create_rpm_build_task(save_to_repository_id, build_for_platform_id, testing)
end
end
def create_rpm_build_task(save_to_repository_id, build_for_platform_id, testing)
key = "#{save_to_repository_id}-#{build_for_platform_id}"
projects_for_cleanup = Redis.current.lrange(PROJECTS_FOR_CLEANUP, 0, -1).select do |k|
(testing && k =~ /^testing-[\d]+-#{key}$/) || (!testing && k =~ /^[\d]+-#{key}$/)
end
prepare_build_lists(projects_for_cleanup, save_to_repository_id)
build_lists = find_build_lists(build_for_platform_id, save_to_repository_id, testing)
old_packages = packages_structure
projects_for_cleanup.each do |key|
Redis.current.lrem PROJECTS_FOR_CLEANUP, 0, key
packages = Redis.current.hget PACKAGES_FOR_CLEANUP, key
next unless packages
packages = JSON.parse packages
old_packages[:sources] |= packages['sources']
Arch.pluck(:name).each do |arch|
old_packages[:binaries][arch.to_sym] |= packages['binaries'][arch] || []
end
end
if testing
build_lists_for_cleanup_from_testing = Redis.current.smembers("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{save_to_repository_id}-#{build_for_platform_id}")
BuildList.where(id: build_lists_for_cleanup_from_testing).each do |b|
fill_packages(b, old_packages, :fullname)
end if build_lists_for_cleanup_from_testing.present?
end
build_lists_for_cleanup_from_testing ||= []
bl = build_lists.first
return false if !bl && old_packages[:sources].empty? && old_packages[:binaries].values.flatten.empty?
save_to_repository = Repository.find save_to_repository_id
# Checks mirror sync status
return false if save_to_repository.repo_lock_file_exists? || !save_to_repository.platform.ready?
repository_status = save_to_repository.repository_statuses.find_or_create_by(platform_id: build_for_platform_id)
return false unless repository_status.publish
save_to_platform = save_to_repository.platform
build_for_platform = Platform.find build_for_platform_id
platform_path = "#{save_to_platform.path}/repository"
if save_to_platform.personal?
platform_path << '/' << build_for_platform.name
system "mkdir -p #{platform_path}"
end
distrib_type = build_for_platform.distrib_type
cmd_params = {
'RELEASED' => save_to_platform.released,
'REPOSITORY_NAME' => save_to_repository.name,
'TYPE' => distrib_type,
'SAVE_TO_PLATFORM' => save_to_platform.name,
'BUILD_FOR_PLATFORM' => build_for_platform.name,
'TESTING' => testing
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
options = {
id: (bl ? bl.id : Time.now.to_i),
cmd_params: cmd_params,
main_script: 'build.sh',
rollback_script: 'rollback.sh',
platform: {
platform_path: platform_path,
type: distrib_type,
name: build_for_platform.name,
arch: (bl ? bl.arch.name : 'x86_64')
},
repository: {id: save_to_repository_id},
time_living: 9600, # 160 min
extra: {
repository_status_id: repository_status.id,
build_lists_for_cleanup_from_testing: build_lists_for_cleanup_from_testing
}
}
packages, build_list_ids, new_sources = fill_in_packages(build_lists, testing)
push(options.merge({
packages: packages,
old_packages: old_packages,
build_list_ids: build_list_ids,
projects_for_cleanup: projects_for_cleanup
}))
lock_projects(projects_for_cleanup)
cleanup(save_to_repository_id, build_for_platform_id, build_lists_for_cleanup_from_testing)
return true
end
def fill_in_packages(build_lists, testing)
packages, build_list_ids, new_sources = packages_structure, [], {}
build_lists.each do |bl|
# remove duplicates of sources for different arches
bl.packages.by_package_type('source').each{ |s| new_sources["#{s.fullname}"] = s.sha1 }
fill_packages(bl, packages)
bl.last_published(testing).includes(:packages).limit(2).each{ |old_bl|
fill_packages(old_bl, old_packages, :fullname)
}
# TODO: do more flexible
# Removes old packages which already in the main repo
bl.last_published(false).includes(:packages).limit(3).each{ |old_bl|
fill_packages(old_bl, old_packages, :fullname)
} if testing
build_list_ids << bl.id
Redis.current.lpush(LOCKED_BUILD_LISTS, bl.id)
end
packages[:sources] = new_sources.values.compact
[packages, build_list_ids, new_sources]
end
def lock_projects(projects_for_cleanup)
projects_for_cleanup.each do |key|
Redis.current.lpush LOCKED_PROJECTS_FOR_CLEANUP, key
end
end
def cleanup(save_to_repository_id, build_for_platform_id, build_lists_for_cleanup_from_testing)
rep_pl = "#{save_to_repository_id}-#{build_for_platform_id}"
r_key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}"
build_lists_for_cleanup_from_testing.each do |key|
Redis.current.srem r_key, key
end
if Redis.current.scard(r_key) == 0
Redis.current.srem REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl
end
end
def push(options)
Resque.push(
'publish_worker_default',
'class' => 'AbfWorker::PublishWorkerDefault',
'args' => [options]
)
end
def prepare_build_lists(projects_for_cleanup, save_to_repository_id)
# We should not to publish new builds into repository
# if project of builds has been removed from repository.
BuildList.where(
project_id: projects_for_cleanup.map{ |k| k.split('-')[testing ? 1 : 0] }.uniq,
save_to_repository_id: save_to_repository_id,
status: [BuildList::BUILD_PUBLISH, BuildList::BUILD_PUBLISH_INTO_TESTING]
).update_all(status: BuildList::FAILED_PUBLISH)
end
def find_build_lists(build_for_platform_id, save_to_repository_id, testing)
build_lists = BuildList.
where(new_core: true, status: (testing ? BuildList::BUILD_PUBLISH_INTO_TESTING : BuildList::BUILD_PUBLISH)).
where(save_to_repository_id: save_to_repository_id).
where(build_for_platform_id: build_for_platform_id).
order(:updated_at)
locked_ids = Redis.current.lrange(LOCKED_BUILD_LISTS, 0, -1)
build_lists = build_lists.where('build_lists.id NOT IN (?)', locked_ids) if locked_ids.present?
build_lists.limit(150)
end
end
end

View File

@ -33,6 +33,7 @@ module Rosa
config.autoload_paths += %W(#{config.root}/app/presenters) config.autoload_paths += %W(#{config.root}/app/presenters)
config.autoload_paths += %W(#{config.root}/app/jobs) config.autoload_paths += %W(#{config.root}/app/jobs)
config.autoload_paths += %W(#{config.root}/app/jobs/concerns) config.autoload_paths += %W(#{config.root}/app/jobs/concerns)
config.autoload_paths += %W(#{config.root}/app/services/abf_worker)
# Only load the plugins named here, in the order given (default is alphabetical). # Only load the plugins named here, in the order given (default is alphabetical).
# :all can be used as a placeholder for all plugins not explicitly named. # :all can be used as a placeholder for all plugins not explicitly named.

View File

@ -8,7 +8,7 @@ clean_rpm_build_nodes:
build_lists_publish_task_manager: build_lists_publish_task_manager:
every: every:
- '3m' - '3m'
class: 'BuildLists::PublishTaskManagerJob' class: 'PublishTaskManagerJob'
queue: middle queue: middle
description: 'Creates tasks for publishing' description: 'Creates tasks for publishing'

View File

@ -16,54 +16,15 @@ module AbfWorker
end end
def run def run
create_tasks_for_regenerate_metadata_for_software_center # create_tasks_for_regenerate_metadata_for_software_center
create_tasks_for_resign_repositories # create_tasks_for_resign_repositories
create_tasks_for_repository_regenerate_metadata # create_tasks_for_repository_regenerate_metadata
create_tasks_for_build_rpms create_tasks_for_build_rpms
create_tasks_for_build_rpms true create_tasks_for_build_rpms true
end end
private private
def create_tasks_for_resign_repositories
RepositoryStatus.platform_ready
.for_resign
.includes(repository: :platform)
.readonly(false)
.each do |repository_status|
r = repository_status.repository
# Checks mirror sync status
next if r.repo_lock_file_exists?
distrib_type = r.platform.distrib_type
cmd_params = {
'RELEASED' => r.platform.released,
'REPOSITORY_NAME' => r.name,
'TYPE' => distrib_type
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
Resque.push(
'publish_worker_default',
'class' => "AbfWorker::PublishWorkerDefault",
'args' => [{
id: r.id,
cmd_params: cmd_params,
main_script: 'resign.sh',
platform: {
platform_path: "#{r.platform.path}/repository",
type: distrib_type,
name: r.platform.name,
arch: 'x86_64'
},
repository: {id: r.id},
skip_feedback: true,
time_living: 9600, # 160 min
extra: {repository_status_id: repository_status.id, resign: true}
}]
) if repository_status.start_resign
end
end
def create_tasks_for_build_rpms(testing = false) def create_tasks_for_build_rpms(testing = false)
available_repos = BuildList. available_repos = BuildList.
select('MIN(updated_at) as min_updated_at, save_to_repository_id, build_for_platform_id'). select('MIN(updated_at) as min_updated_at, save_to_repository_id, build_for_platform_id').
@ -234,91 +195,5 @@ module AbfWorker
return true return true
end end
def create_tasks_for_regenerate_metadata_for_software_center
Platform.main.waiting_for_regeneration.each do |platform|
repos = platform.repositories
statuses = RepositoryStatus.where(platform_id: platform.id)
next if repos.find{ |r| r.repo_lock_file_exists? }
next if statuses.present? &&
statuses.map{ |s| s.ready? || s.can_start_regeneration? || s.can_start_resign? }.uniq != [true]
cmd_params = {
'RELEASED' => platform.released,
'REPOSITORY_NAMES' => platform.repositories.map(&:name).join(','),
'TYPE' => platform.distrib_type,
'REGENERATE_PLATFORM_METADATA' => true,
'SAVE_TO_PLATFORM' => platform.name,
'BUILD_FOR_PLATFORM' => platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
Resque.push(
'publish_worker_default',
'class' => 'AbfWorker::PublishWorkerDefault',
'args' => [{
id: Time.now.to_i,
cmd_params: cmd_params,
main_script: 'regenerate_platform_metadata.sh',
platform: {
platform_path: "#{platform.path}/repository",
type: platform.distrib_type,
name: platform.name,
arch: 'x86_64'
},
time_living: 9600, # 160 min
extra: {platform_id: platform.id, regenerate_platform: true}
}]
) if platform.start_regeneration
end
end
def create_tasks_for_repository_regenerate_metadata
RepositoryStatus.platform_ready
.for_regeneration
.includes(repository: :platform)
.readonly(false)
.each do |repository_status|
rep = repository_status.repository
# Checks mirror sync status
next if rep.repo_lock_file_exists?
build_for_platform = repository_status.platform
cmd_params = {
'RELEASED' => rep.platform.released,
'REPOSITORY_NAME' => rep.name,
'TYPE' => build_for_platform.distrib_type,
'REGENERATE_METADATA' => true,
'SAVE_TO_PLATFORM' => rep.platform.name,
'BUILD_FOR_PLATFORM' => build_for_platform.name
}.map{ |k, v| "#{k}=#{v}" }.join(' ')
platform_path = "#{rep.platform.path}/repository"
if rep.platform.personal?
platform_path << '/' << build_for_platform.name
system "mkdir -p #{platform_path}"
end
Resque.push(
'publish_worker_default',
'class' => 'AbfWorker::PublishWorkerDefault',
'args' => [{
id: Time.now.to_i,
cmd_params: cmd_params,
main_script: 'build.sh',
rollback_script: 'rollback.sh',
platform: {
platform_path: platform_path,
type: build_for_platform.distrib_type,
name: build_for_platform.name,
arch: 'x86_64'
},
repository: {id: rep.id},
# time_living: 9600, # 160 min
time_living: 14400, # 240 min
extra: {repository_status_id: repository_status.id, regenerate: true}
}]
) if repository_status.start_regeneration
end
end
end end
end end

View File

@ -38,7 +38,7 @@ module AbfWorker
elsif !extra['resign'] # Simple publish elsif !extra['resign'] # Simple publish
bls = extra['build_lists_for_cleanup_from_testing'] bls = extra['build_lists_for_cleanup_from_testing']
if status != COMPLETED && bls.present? if status != COMPLETED && bls.present?
AbfWorkerHelper.cleanup_packages_from_testing( AbfWorkerService::Base.cleanup_packages_from_testing(
repository_status.platform_id, repository_status.platform_id,
repository_status.repository_id, repository_status.repository_id,
bls bls
@ -72,14 +72,14 @@ module AbfWorker
build_list.fail_publish_into_testing || build_list.update_column(:status, BuildList::FAILED_PUBLISH_INTO_TESTING) build_list.fail_publish_into_testing || build_list.update_column(:status, BuildList::FAILED_PUBLISH_INTO_TESTING)
end end
end end
AbfWorkerHelper.unlock_build_list build_list AbfWorkerService::Base.unlock_build_list build_list
end end
case status case status
when COMPLETED when COMPLETED
AbfWorkerHelper.cleanup_completed options['projects_for_cleanup'] AbfWorkerService::Base.cleanup_completed options['projects_for_cleanup']
when FAILED, CANCELED when FAILED, CANCELED
AbfWorkerHelper.cleanup_failed options['projects_for_cleanup'] AbfWorkerService::Base.cleanup_failed options['projects_for_cleanup']
end end
end end

View File

@ -0,0 +1,21 @@
require 'spec_helper'
describe DestroyProjectFromRepositoryJob do
let(:project) { FactoryGirl.build(:project, id: 123) }
let(:repository) { FactoryGirl.build(:repository, id: 234) }
before do
stub_symlink_methods
allow(Project).to receive(:find).with(123).and_return(project)
allow(Repository).to receive(:find).with(234).and_return(repository)
end
subject { DestroyProjectFromRepositoryJob }
it 'ensures that not raises error' do
expect do
subject.perform 123, 234
end.to_not raise_exception
end
end

View File

@ -122,42 +122,6 @@ describe AbfWorker::BuildListsPublishTaskManager do
end end
context 'creates task for removing project from repository' do
before do
build_list.update_column(:status, BuildList::BUILD_PUBLISHED)
FactoryGirl.create(:build_list_package, build_list: build_list)
ProjectToRepository.where(project_id: build_list.project_id, repository_id: build_list.save_to_repository_id).destroy_all
2.times{ subject.new.run }
end
%w(LOCKED_BUILD_LISTS).each do |kind|
it "ensures that no '#{kind.downcase.gsub('_', ' ')}'" do
@redis_instance.lrange(subject.const_get(kind), 0, -1).should be_empty
end
end
it "ensures that has only 'projects for cleanup' for testing subrepo" do
queue = @redis_instance.lrange(subject::PROJECTS_FOR_CLEANUP, 0, -1)
queue.should have(1).item
queue.should include("testing-#{build_list.project_id}-#{build_list.save_to_repository_id}-#{build_list.build_for_platform_id}")
end
it "ensures that only one repository_status has status publish" do
RepositoryStatus.where(status: RepositoryStatus::PUBLISH).should have(1).item
end
it "ensures that 'locked projects for cleanup' has only one item" do
queue = @redis_instance.lrange(subject::LOCKED_PROJECTS_FOR_CLEANUP, 0, -1)
queue.should have(1).item
queue.should include("#{build_list.project_id}-#{build_list.save_to_repository_id}-#{build_list.build_for_platform_id}")
end
it "ensures that new task for publishing has been created" do
@redis_instance.lrange('resque:queue:publish_worker_default', 0, -1).should have(1).item
end
end
context 'grouping build lists for publishing and tasks for removing project from repository' do context 'grouping build lists for publishing and tasks for removing project from repository' do
let(:build_list2) { FactoryGirl.create(:build_list, let(:build_list2) { FactoryGirl.create(:build_list,
new_core: true, new_core: true,
@ -206,65 +170,6 @@ describe AbfWorker::BuildListsPublishTaskManager do
end end
end end
context 'resign packages in repository' do
before do
build_list.update_column(:status, BuildList::BUILD_PUBLISH)
FactoryGirl.create(:key_pair, repository: build_list.save_to_repository)
2.times{ subject.new.run }
end
%w(PROJECTS_FOR_CLEANUP LOCKED_PROJECTS_FOR_CLEANUP LOCKED_BUILD_LISTS).each do |kind|
it "ensure that no '#{kind.downcase.gsub('_', ' ')}'" do
@redis_instance.lrange(subject.const_get(kind), 0, -1).should be_empty
end
end
it "ensures that only one repository_status has status resign" do
RepositoryStatus.where(status: RepositoryStatus::RESIGN).should have(1).item
end
it "ensure that new task for resign has been created" do
@redis_instance.lrange('resque:queue:publish_worker_default', 0, -1).should have(1).item
end
end
context 'regenerate metadata' do
context 'for repository of main platform' do
let(:repository) { FactoryGirl.create(:repository) }
before do
repository.regenerate
subject.new.run
end
it "ensures that only one repository_status has status regenerating" do
RepositoryStatus.where(status: RepositoryStatus::REGENERATING).should have(1).item
end
it 'ensures that new task has been created' do
@redis_instance.lrange('resque:queue:publish_worker_default', 0, -1).should have(1).item
end
end
context 'for repository of personal platform' do
let(:main_platform) { FactoryGirl.create(:platform) }
let(:repository) { FactoryGirl.create(:personal_repository) }
before do
repository.regenerate main_platform.id
subject.new.run
end
it "ensures that only one repository_status has status regenerating" do
RepositoryStatus.where(status: RepositoryStatus::REGENERATING).should have(1).item
end
it 'ensures that new task has been created' do
@redis_instance.lrange('resque:queue:publish_worker_default', 0, -1).should have(1).item
end
end
end
after(:all) do after(:all) do
APP_CONFIG['abf_worker']['publish_workers_count'] = @publish_workers_count APP_CONFIG['abf_worker']['publish_workers_count'] = @publish_workers_count
FileUtils.rm_rf(APP_CONFIG['root_path']) FileUtils.rm_rf(APP_CONFIG['root_path'])

View File

@ -1,26 +1,24 @@
require 'spec_helper' require 'spec_helper'
describe ProjectToRepository do describe ProjectToRepository do
before(:each) do let(:platform) { FactoryGirl.create(:platform) }
let(:first_repo) { FactoryGirl.create(:repository, platform: platform) }
let(:second_repo) { FactoryGirl.create(:repository, platform: platform) }
let(:project) { FactoryGirl.create(:project) }
before do
stub_symlink_methods stub_symlink_methods
@platform = FactoryGirl.create(:platform) first_repo.projects << project
@first_repo = FactoryGirl.create(:repository, platform_id: @platform.id) first_repo.save
@second_repo = FactoryGirl.create(:repository, platform_id: @platform.id)
@project = FactoryGirl.create(:project)
@first_repo.projects << @project
@first_repo.save
end end
it 'should not add the same project in different repositories of same platform' do it 'should not add the same project in different repositories of same platform' do
p2r = @second_repo.project_to_repositories.build project_id: @project.id p2r = second_repo.project_to_repositories.build project: project
p2r.should_not be_valid expect(p2r).to_not be_valid
end end
it 'creates task for removing project from repository on destroy' do it 'creates task for removing project from repository on destroy' do
@first_repo.project_to_repositories.destroy_all expect(Resque).to receive(:enqueue).with(DestroyProjectFromRepositoryJob, project.id, first_repo.id)
queue = @redis_instance.lrange(AbfWorker::BuildListsPublishTaskManager::PROJECTS_FOR_CLEANUP, 0, -1) first_repo.project_to_repositories.destroy_all
queue.should have(2).item
key = "#{@project.id}-#{@first_repo.id}-#{@platform.id}"
queue.should include(key, "testing-#{key}")
end end
end end

View File

@ -0,0 +1,39 @@
require 'spec_helper'
describe AbfWorkerService::RepositoryMetadata do
let(:repository) { FactoryGirl.build(:repository, id: 123) }
let(:platform) { repository.platform }
let(:repository_status) { double(:repository_status, id: 234, repository: repository, platform: platform) }
before do
stub_symlink_methods
allow(repository_status).to receive(:start_regeneration).and_return(true)
end
subject { AbfWorkerService::RepositoryMetadata.new(repository_status) }
context '#regenerate!' do
context 'repository of main platform' do
it 'creates task' do
expect(subject).to_not receive(:system)
expect(Resque).to receive(:push)
subject.regenerate!
end
end
context 'repository of personal platform' do
before do
allow(platform).to receive(:personal?).and_return(true)
end
it 'creates task' do
expect(subject).to receive(:system)
expect(Resque).to receive(:push)
subject.regenerate!
end
end
end
end

View File

@ -0,0 +1,60 @@
require 'spec_helper'
describe AbfWorkerService::Repository do
let(:repository) { FactoryGirl.build(:repository, id: 123) }
before do
stub_symlink_methods
end
subject { AbfWorkerService::Repository.new(repository) }
context '#destroy_project!' do
let(:project) { FactoryGirl.build(:project, id: 234) }
context 'repository of main platform' do
let(:key) { "#{project.id}-#{repository.id}-#{repository.platform_id}" }
it 'adds to PROJECTS_FOR_CLEANUP queue' do
expect(Redis.current).to receive(:lpush).with(AbfWorkerService::Base::PROJECTS_FOR_CLEANUP, key)
expect(Redis.current).to receive(:lpush).with(AbfWorkerService::Base::PROJECTS_FOR_CLEANUP, 'testing-' << key)
subject.destroy_project!(project)
end
end
context 'repository of personal platform' do
let(:platform1) { FactoryGirl.build(:platform, id: 345) }
let(:platform2) { FactoryGirl.build(:platform, id: 456) }
before do
allow(repository.platform).to receive(:personal?).and_return(true)
allow(Platform).to receive(:main).and_return([platform1, platform2])
end
it 'adds to PROJECTS_FOR_CLEANUP queue' do
[platform1, platform2].each do |platform|
key = "#{project.id}-#{repository.id}-#{platform.id}"
expect(Redis.current).to receive(:lpush).with(AbfWorkerService::Base::PROJECTS_FOR_CLEANUP, key)
expect(Redis.current).to receive(:lpush).with(AbfWorkerService::Base::PROJECTS_FOR_CLEANUP, 'testing-' << key)
end
subject.destroy_project!(project)
end
end
end
context '#resign!' do
let(:repository_status) { double(:repository_status, id: 234) }
it 'creates task' do
expect(repository_status).to receive(:start_resign).and_return(true)
expect(Resque).to receive(:push)
subject.resign!(repository_status)
end
end
end