From 829b3b83fea6fdef817e57305dd9a9a62d66def3 Mon Sep 17 00:00:00 2001 From: Vokhmin Alexey V Date: Thu, 17 Jan 2013 18:15:03 +0400 Subject: [PATCH] #794: update logic for resign packages in repository --- app/models/key_pair.rb | 26 +------- .../build_lists_publish_task_manager.rb | 65 +++++++++++++++++-- lib/abf_worker/publish_observer.rb | 58 +++++++++++------ 3 files changed, 96 insertions(+), 53 deletions(-) diff --git a/app/models/key_pair.rb b/app/models/key_pair.rb index 2316ff503..f23711fc9 100644 --- a/app/models/key_pair.rb +++ b/app/models/key_pair.rb @@ -15,35 +15,11 @@ class KeyPair < ActiveRecord::Base validate :check_keys before_create { |record| record.key_id = @fingerprint } - after_create :resign_rpms, + after_create { |record| AbfWorker::BuildListsPublishTaskManager.resign_repository record }, :unless => Proc.new { |key_pair| key_pair.repository.platform.personal? } protected - def resign_rpms - platform = repository.platform - Resque.push( - 'publish_worker_default', - 'class' => "AbfWorker::PublishWorkerDefault", - 'args' => [{ - :id => id, - :arch => 'x86_64', - :distrib_type => platform.distrib_type, - :platform => { - :platform_path => "#{platform.path}/repository", - :released => platform.released - }, - :repository => { - :name => repository.name, - :id => repository.id - }, - :type => :resign, - :save_results => false, - :time_living => 2400 # 40 min - }] - ) - end - def check_keys dir = Dir.mktmpdir('keys-', "#{APP_CONFIG['root_path']}/tmp") begin diff --git a/lib/abf_worker/build_lists_publish_task_manager.rb b/lib/abf_worker/build_lists_publish_task_manager.rb index 45eb87a5d..a22b96412 100644 --- a/lib/abf_worker/build_lists_publish_task_manager.rb +++ b/lib/abf_worker/build_lists_publish_task_manager.rb @@ -2,11 +2,13 @@ module AbfWorker class BuildListsPublishTaskManager REDIS_MAIN_KEY = 'abf-worker::build-lists-publish-task-manager::' + RESIGN_REPOSITORIES = "#{REDIS_MAIN_KEY}resign-repositories" + LOCKED_REPOSITORIES = "#{REDIS_MAIN_KEY}locked-repositories" LOCKED_REP_AND_PLATFORMS = "#{REDIS_MAIN_KEY}locked-repositories-and-platforms" LOCKED_BUILD_LISTS = "#{REDIS_MAIN_KEY}locked-build-lists" def initialize - @redis = Resque.redis + @redis = self.redis @workers_count = APP_CONFIG['abf_worker']['publish_workers_count'] end @@ -18,6 +20,11 @@ module AbfWorker order(:min_updated_at). limit(@workers_count * 2) # because some repos may be locked + create_tasks_for_resign_repositories + + locked_rep = @redis.lrange(LOCKED_REPOSITORIES, 0, -1) + available_repos = available_repos.where('save_to_repository_id NOT IN (?)', locked_rep) unless locked_rep.empty? + counter = 1 # looks like: @@ -34,17 +41,61 @@ module AbfWorker end end - def self.unlock_build_list(build_list) - Resque.redis.lrem(LOCKED_BUILD_LISTS, 0, build_list.id) - end + class << self + def resign_repository(key_pair) + redis.lpush RESIGN_REPOSITORIES, key_pair.repository_id + end - def self.unlock_rep_and_platform(build_list) - key = "#{build_list.save_to_repository_id}-#{build_list.build_for_platform_id}" - Resque.redis.lrem(LOCKED_REP_AND_PLATFORMS, 0, key) + def unlock_repository(repository_id) + redis.lrem LOCKED_REPOSITORIES, 0, repository_id + end + + def unlock_build_list(build_list) + redis.lrem LOCKED_BUILD_LISTS, 0, build_list.id + end + + def unlock_rep_and_platform(build_list) + key = "#{build_list.save_to_repository_id}-#{build_list.build_for_platform_id}" + redis.lrem LOCKED_REP_AND_PLATFORMS, 0, key + end + + def redis + Resque.redis + end end private + def create_tasks_for_resign_repositories + resign_repos = @redis.lrange RESIGN_REPOSITORIES, 0, -1 + locked_repos = @redis.lrange LOCKED_REPOSITORIES, 0, -1 + + Repository.where(:id => (resign_repos - locked_repos)).each do |r| + @redis.lrem RESIGN_REPOSITORIES, 0, r.id + @redis.lpush LOCKED_REPOSITORIES, r.id + Resque.push( + 'publish_worker_default', + 'class' => "AbfWorker::PublishWorkerDefault", + 'args' => [{ + :id => r.id, + :arch => 'x86_64', + :distrib_type => r.platform.distrib_type, + :platform => { + :platform_path => "#{r.platform.path}/repository", + :released => r.platform.released + }, + :repository => { + :name => r.name, + :id => r.id + }, + :type => :resign, + :skip_feedback => true, + :time_living => 2400 # 40 min + }] + ) + end + end + def create_task(save_to_repository_id, build_for_platform_id) build_lists = BuildList. where(:new_core => true, :status => BuildList::BUILD_PUBLISH). diff --git a/lib/abf_worker/publish_observer.rb b/lib/abf_worker/publish_observer.rb index 3071e21b3..16379d7c4 100644 --- a/lib/abf_worker/publish_observer.rb +++ b/lib/abf_worker/publish_observer.rb @@ -2,28 +2,44 @@ module AbfWorker class PublishObserver < AbfWorker::BaseObserver @queue = :publish_observer - def self.perform(options) - status = options['status'].to_i - return if status == STARTED # do nothing when publication started - build_lists = BuildList.where(:id => options['build_list_ids']) - build_lists.each do |bl| - update_results(bl, options) - case status - when COMPLETED - bl.published - when FAILED, CANCELED - bl.fail_publish - end - AbfWorker::BuildListsPublishTaskManager.unlock_build_list bl - end - AbfWorker::BuildListsPublishTaskManager.unlock_rep_and_platform build_lists.first - end - def self.update_results(subject, options) - results = (subject.results || []). - select{ |r| r['file_name'] !~ /^abfworker\:\:publish\-worker.*\.log$/ } - results |= options['results'] - sort_results_and_save(subject, results) + class << self + def perform(options) + status = options['status'].to_i + return if status == STARTED # do nothing when publication started + case options['type'] + when 'resign' + AbfWorker::BuildListsPublishTaskManager.unlock_repository options['id'] + when 'cleanup' + else + update_rpm_builds options + end + end + + protected + + def update_rpm_builds(options) + build_lists = BuildList.where(:id => options['build_list_ids']) + build_lists.each do |bl| + update_results(bl, options) + case status + when COMPLETED + bl.published + when FAILED, CANCELED + bl.fail_publish + end + AbfWorker::BuildListsPublishTaskManager.unlock_build_list bl + end + AbfWorker::BuildListsPublishTaskManager.unlock_rep_and_platform build_lists.first + end + + def update_results(subject, options) + results = (subject.results || []). + select{ |r| r['file_name'] !~ /^abfworker\:\:publish\-worker.*\.log$/ } + results |= options['results'] + sort_results_and_save(subject, results) + end + end end