diff --git a/app/models/project.rb b/app/models/project.rb index dba3cef5c..7c5b45027 100644 --- a/app/models/project.rb +++ b/app/models/project.rb @@ -194,63 +194,11 @@ class Project < ActiveRecord::Base end def destroy_project_from_repository(repository) - platform = repository.platform - published_packages = build_lists.for_status(BuildList::BUILD_PUBLISHED). - scoped_to_save_platform(platform.id) - if platform.personal? - Platform.main.each do |main_platform| - add_job_to_abf_worker_queue( - repository, - platform, - "#{platform.path}/repository/#{main_platform.name}", - main_platform - ) - end - else - add_job_to_abf_worker_queue( - repository, - platform, - "#{platform.path}/repository" - ) - end + AbfWorker::BuildListsPublishTaskManager.destroy_project_from_repository self, repository end - later :destroy_project_from_repository, :queue => :clone_build protected - def add_job_to_abf_worker_queue(repository, platform, platform_path, main_platform = nil) - type = main_platform ? main_platform.distrib_type : platform.distrib_type - Arch.all.each do |arch| - packages = build_lists.for_status(BuildList::BUILD_PUBLISHED). - scoped_to_save_platform(platform.id) - packages = packages.for_platform(main_platform.id) if main_platform - packages = packages.scoped_to_arch(arch.id). - includes(:packages).last(10). - map{ |bl| bl.packages }.flatten - sources = packages.map{ |p| p.fullname if p.package_type == 'source' }.compact - binaries = packages.map{ |p| p.fullname if p.package_type == 'binary' }.compact - next if sources.empty? && binaries.empty? - Resque.push( - "publish_build_list_container_#{type}_worker", - 'class' => "AbfWorker::PublishBuildListContainer#{type.capitalize}Worker", - 'args' => [{ - :id => repository.id, - :arch => arch.name, - :distrib_type => type, - :packages => { :sources => sources, :binaries => binaries }, - :platform => { - :platform_path => platform_path, - :released => platform.released - }, - :repository => { :name => repository.name, :id => repository.id }, - :type => :cleanup, - :save_results => false, - :time_living => 2400 # 40 min - }] - ) - end - end - def truncate_name self.name = name.strip if name end diff --git a/lib/abf_worker/build_lists_publish_task_manager.rb b/lib/abf_worker/build_lists_publish_task_manager.rb index a22b96412..53b31895d 100644 --- a/lib/abf_worker/build_lists_publish_task_manager.rb +++ b/lib/abf_worker/build_lists_publish_task_manager.rb @@ -2,7 +2,13 @@ module AbfWorker class BuildListsPublishTaskManager REDIS_MAIN_KEY = 'abf-worker::build-lists-publish-task-manager::' + RESIGN_REPOSITORIES = "#{REDIS_MAIN_KEY}resign-repositories" + + PROJECTS_FOR_CLEANUP = "#{REDIS_MAIN_KEY}projects-for-cleanup" + LOCKED_PROJECTS_FOR_CLEANUP = "#{REDIS_MAIN_KEY}locked-projects-for-cleanup" + + LOCKED_REPOSITORIES = "#{REDIS_MAIN_KEY}locked-repositories" LOCKED_REP_AND_PLATFORMS = "#{REDIS_MAIN_KEY}locked-repositories-and-platforms" LOCKED_BUILD_LISTS = "#{REDIS_MAIN_KEY}locked-build-lists" @@ -13,35 +19,34 @@ module AbfWorker end def run - available_repos = BuildList. - select('MIN(updated_at) as min_updated_at, save_to_repository_id, build_for_platform_id'). - where(:new_core => true, :status => BuildList::BUILD_PUBLISH). - group(:save_to_repository_id, :build_for_platform_id). - order(:min_updated_at). - limit(@workers_count * 2) # because some repos may be locked - create_tasks_for_resign_repositories - - locked_rep = @redis.lrange(LOCKED_REPOSITORIES, 0, -1) - available_repos = available_repos.where('save_to_repository_id NOT IN (?)', locked_rep) unless locked_rep.empty? - - counter = 1 - - # looks like: - # ['save_to_repository_id-build_for_platform_id', ...] - locked_rep_and_pl = @redis.lrange(LOCKED_REP_AND_PLATFORMS, 0, -1) - available_repos.each do |el| - key = "#{el.save_to_repository_id}-#{el.build_for_platform_id}" - next if locked_rep_and_pl.include?(key) - break if counter > @workers_count - if create_task(el.save_to_repository_id, el.build_for_platform_id) - @redis.lpush(LOCKED_REP_AND_PLATFORMS, key) - counter += 1 - end - end + create_tasks_for_build_rpms end class << self + def destroy_project_from_repository(project, repository) + if repository.platform.personal? + Platform.main.each do |main_platform| + redis.lpush PROJECTS_FOR_CLEANUP, "#{project.id}-#{repository.id}-#{main_platform.id}" + end + else + redis.lpush PROJECTS_FOR_CLEANUP, "#{project.id}-#{repository.id}-#{repository.platform.id}" + end + end + + def cleanup_completed(projects_for_cleanup) + projects_for_cleanup.each do |key| + redis.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key + end + end + + def cleanup_failed(projects_for_cleanup) + projects_for_cleanup.each do |key| + redis.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key + redis.lpush PROJECTS_FOR_CLEANUP, key + end + end + def resign_repository(key_pair) redis.lpush RESIGN_REPOSITORIES, key_pair.repository_id end @@ -55,8 +60,7 @@ module AbfWorker end def unlock_rep_and_platform(build_list) - key = "#{build_list.save_to_repository_id}-#{build_list.build_for_platform_id}" - redis.lrem LOCKED_REP_AND_PLATFORMS, 0, key + redis.lrem LOCKED_REP_AND_PLATFORMS, 0, "#{build_list.save_to_repository_id}-#{build_list.build_for_platform_id}" end def redis @@ -66,11 +70,23 @@ module AbfWorker private + def locked_repositories + @redis.lrange LOCKED_REPOSITORIES, 0, -1 + end + + # def create_tasks_for_cleanup + # reps_to_projects = {} + # @redis.lrange(PROJECTS_AND_REPOS_FOR_CLEANUP, 0, -1).each do |key| + # project_id, repository_id = *key.split('-') + # locked_rep = locked_repositories + # next if locked_repositories + # end + # end + def create_tasks_for_resign_repositories resign_repos = @redis.lrange RESIGN_REPOSITORIES, 0, -1 - locked_repos = @redis.lrange LOCKED_REPOSITORIES, 0, -1 - Repository.where(:id => (resign_repos - locked_repos)).each do |r| + Repository.where(:id => (resign_repos - locked_repositories)).each do |r| @redis.lrem RESIGN_REPOSITORIES, 0, r.id @redis.lpush LOCKED_REPOSITORIES, r.id Resque.push( @@ -96,7 +112,41 @@ module AbfWorker end end - def create_task(save_to_repository_id, build_for_platform_id) + def create_tasks_for_build_rpms + available_repos = BuildList. + select('MIN(updated_at) as min_updated_at, save_to_repository_id, build_for_platform_id'). + where(:new_core => true, :status => BuildList::BUILD_PUBLISH). + group(:save_to_repository_id, :build_for_platform_id). + order(:min_updated_at). + limit(@workers_count * 2) # because some repos may be locked + + locked_rep = locked_repositories + available_repos = available_repos.where('save_to_repository_id NOT IN (?)', locked_rep) unless locked_rep.empty? + + counter = 1 + + # looks like: + # ['save_to_repository_id-build_for_platform_id', ...] + locked_rep_and_pl = @redis.lrange(LOCKED_REP_AND_PLATFORMS, 0, -1) + + for_cleanup = @redis.lrange(PROJECTS_FOR_CLEANUP, 0, -1).map do |key| + pr, rep, pl = *key.split('-') + if locked_rep.present? && locked_rep.include?(rep) + nil + else + [rep.to_i, pl.to_i] + end + end.compact + available_repos = available_repos.map{ |bl| [bl.save_to_repository_id, bl.build_for_platform_id] } | for_cleanup + + available_repos.each do |save_to_repository_id, build_for_platform_id| + next if locked_rep_and_pl.include?("#{save_to_repository_id}-#{build_for_platform_id}") + break if counter > @workers_count + counter += 1 if create_rpm_build_task(save_to_repository_id, build_for_platform_id) + end + end + + def create_rpm_build_task(save_to_repository_id, build_for_platform_id) build_lists = BuildList. where(:new_core => true, :status => BuildList::BUILD_PUBLISH). where(:save_to_repository_id => save_to_repository_id). @@ -105,7 +155,20 @@ module AbfWorker locked_ids = @redis.lrange(LOCKED_BUILD_LISTS, 0, -1) build_lists = build_lists.where('build_lists.id NOT IN (?)', locked_ids) unless locked_ids.empty? - bl = build_lists.first + projects_for_cleanup = @redis.lrange(PROJECTS_FOR_CLEANUP, 0, -1). + select{ |k| k =~ /#{save_to_repository_id}\-#{build_for_platform_id}$/ } + + build_lists_for_cleanup = projects_for_cleanup.map do |key| + pr, rep, pl = *key.split('-') + BuildList.where(:project_id => pr). + where(:new_core => true, :status => BuildList::BUILD_PUBLISHED). + where(:save_to_repository_id => save_to_repository_id). + where(:build_for_platform_id => build_for_platform_id). + order(:updated_at).first + end.compact + + + bl = build_lists.first || build_lists_for_cleanup.first return false unless bl platform_path = "#{bl.save_to_platform.path}/repository" @@ -147,17 +210,31 @@ module AbfWorker build_list_ids << bl.id @redis.lpush(LOCKED_BUILD_LISTS, bl.id) end - packages[:sources] = new_sources.values + + build_lists_for_cleanup.each do |bl| + bl.last_published.includes(:packages).limit(5).each{ |old_bl| + fill_packages(old_bl, old_packages, :fullname) + } + end + Resque.push( worker_queue, 'class' => worker_class, 'args' => [options.merge({ :packages => packages, :old_packages => old_packages, - :build_list_ids => build_list_ids + :build_list_ids => build_list_ids, + :projects_for_cleanup => projects_for_cleanup })] ) + + projects_for_cleanup.each do |key| + redis.lrem PROJECTS_FOR_CLEANUP, 0, key + redis.lpush LOCKED_PROJECTS_FOR_CLEANUP, key + end + + @redis.lpush(LOCKED_REP_AND_PLATFORMS, "#{save_to_repository_id}-#{build_for_platform_id}") return true end diff --git a/lib/abf_worker/publish_observer.rb b/lib/abf_worker/publish_observer.rb index 16379d7c4..457698c63 100644 --- a/lib/abf_worker/publish_observer.rb +++ b/lib/abf_worker/publish_observer.rb @@ -7,10 +7,8 @@ module AbfWorker def perform(options) status = options['status'].to_i return if status == STARTED # do nothing when publication started - case options['type'] - when 'resign' + if options['type'] == 'resign' AbfWorker::BuildListsPublishTaskManager.unlock_repository options['id'] - when 'cleanup' else update_rpm_builds options end @@ -25,12 +23,15 @@ module AbfWorker case status when COMPLETED bl.published + AbfWorker::BuildListsPublishTaskManager.cleanup_completed options['projects_for_cleanup'] when FAILED, CANCELED bl.fail_publish + AbfWorker::BuildListsPublishTaskManager.cleanup_failed options['projects_for_cleanup'] end AbfWorker::BuildListsPublishTaskManager.unlock_build_list bl end - AbfWorker::BuildListsPublishTaskManager.unlock_rep_and_platform build_lists.first + bl = build_lists.first || BuildList.find(options['id']) + AbfWorker::BuildListsPublishTaskManager.unlock_rep_and_platform bl end def update_results(subject, options)