From 81931be8cc0ecb7982605d8bf5b9d3979f9df45c Mon Sep 17 00:00:00 2001 From: Vokhmin Alexey V Date: Sat, 12 Jan 2013 05:36:00 +0400 Subject: [PATCH] #823: add BuildListsPublishTaskManager --- app/models/build_list.rb | 95 +++++++++-------- config/schedule.rb | 4 + .../build_lists_publish_task_manager.rb | 100 ++++++++++++++++++ lib/abf_worker/model_helper.rb | 2 - 4 files changed, 152 insertions(+), 49 deletions(-) create mode 100644 lib/abf_worker/build_lists_publish_task_manager.rb diff --git a/app/models/build_list.rb b/app/models/build_list.rb index 3d779c34e..6efdc3c94 100644 --- a/app/models/build_list.rb +++ b/app/models/build_list.rb @@ -151,8 +151,8 @@ class BuildList < ActiveRecord::Base after_transition :on => :published, :do => [:set_version_and_tag, :actualize_packages] after_transition :on => :cancel, :do => [:cancel_job], :if => lambda { |build_list| build_list.new_core? } - after_transition :on => :publish, :do => [:publish_container], - :if => lambda { |build_list| build_list.new_core? } + # after_transition :on => :publish, :do => [:publish_container], + # :if => lambda { |build_list| build_list.new_core? } after_transition :on => [:published, :fail_publish, :build_error], :do => :notify_users after_transition :on => :build_success, :do => :notify_users, @@ -271,46 +271,46 @@ class BuildList < ActiveRecord::Base can_publish? and not save_to_repository.publish_without_qa end - def publish_container - type = build_for_platform.distrib_type - archive = results.find{ |r| r['file_name'] =~ /.*\.tar\.gz$/ } + # def publish_container + # type = build_for_platform.distrib_type + # archive = results.find{ |r| r['file_name'] =~ /.*\.tar\.gz$/ } - platform_path = "#{save_to_platform.path}/repository" - if save_to_platform.personal? - platform_path << '/' - platform_path << build_for_platform.name - Dir.mkdir(platform_path) unless File.exists?(platform_path) - end + # platform_path = "#{save_to_platform.path}/repository" + # if save_to_platform.personal? + # platform_path << '/' + # platform_path << build_for_platform.name + # Dir.mkdir(platform_path) unless File.exists?(platform_path) + # end - packages = last_published.includes(:packages).limit(5).map{ |bl| bl.packages }.flatten - sources = packages.map{ |p| p.fullname if p.package_type == 'source' }.compact - binaries = packages.map{ |p| p.fullname if p.package_type == 'binary' }.compact + # packages = last_published.includes(:packages).limit(5).map{ |bl| bl.packages }.flatten + # sources = packages.map{ |p| p.fullname if p.package_type == 'source' }.compact + # binaries = packages.map{ |p| p.fullname if p.package_type == 'binary' }.compact - Resque.push( - worker_queue_with_priority("publish_#{type}_worker"), - 'class' => worker_queue_class("AbfWorker::Publish#{type.capitalize}Worker"), - 'args' => [{ - :id => id, - :arch => arch.name, - :distrib_type => type, - :packages => { - :sources => packages.by_package_type('source').pluck(:sha1), - :binaries => packages.by_package_type('binary').pluck(:sha1) - }, - :old_packages => { :sources => sources, :binaries => binaries }, - :platform => { - :platform_path => platform_path, - :released => save_to_platform.released - }, - :repository => { - :name => save_to_repository.name, - :id => save_to_repository.id - }, - :type => :publish, - :time_living => 2400 # 40 min - }] - ) - end + # Resque.push( + # worker_queue_with_priority("publish_#{type}_worker"), + # 'class' => worker_queue_class("AbfWorker::Publish#{type.capitalize}Worker"), + # 'args' => [{ + # :id => id, + # :arch => arch.name, + # :distrib_type => type, + # :packages => { + # :sources => packages.by_package_type('source').pluck(:sha1), + # :binaries => packages.by_package_type('binary').pluck(:sha1) + # }, + # :old_packages => { :sources => sources, :binaries => binaries }, + # :platform => { + # :platform_path => platform_path, + # :released => save_to_platform.released + # }, + # :repository => { + # :name => save_to_repository.name, + # :id => save_to_repository.id + # }, + # :type => :publish, + # :time_living => 2400 # 40 min + # }] + # ) + # end def add_to_queue if new_core? @@ -426,6 +426,15 @@ class BuildList < ActiveRecord::Base end end + def last_published + BuildList.where(:project_id => self.project_id, + :save_to_repository_id => self.save_to_repository_id) + .for_platform(self.build_for_platform_id) + .scoped_to_arch(self.arch_id) + .for_status(BUILD_PUBLISHED) + .recent + end + protected def abf_worker_priority @@ -511,12 +520,4 @@ class BuildList < ActiveRecord::Base end end - def last_published - BuildList.where(:project_id => self.project_id, - :save_to_repository_id => self.save_to_repository_id) - .for_platform(self.build_for_platform_id) - .scoped_to_arch(self.arch_id) - .for_status(BUILD_PUBLISHED) - .recent - end end diff --git a/config/schedule.rb b/config/schedule.rb index dc4327b80..e3960634b 100644 --- a/config/schedule.rb +++ b/config/schedule.rb @@ -22,3 +22,7 @@ end every 1.day, :at => '3:00 am' do rake "activity_feeds:clear", :output => 'log/activity_feeds.log' end + +every 1.minute do + runner 'AbfWorker::BuildListsPublishTaskManager.new.run' +end \ No newline at end of file diff --git a/lib/abf_worker/build_lists_publish_task_manager.rb b/lib/abf_worker/build_lists_publish_task_manager.rb new file mode 100644 index 000000000..a01f58ccb --- /dev/null +++ b/lib/abf_worker/build_lists_publish_task_manager.rb @@ -0,0 +1,100 @@ +module AbfWorker + class BuildListsPublishTaskManager + REDIS_MAIN_KEY = 'abf-worker::build-lists-publish-task-manager::' + LOCKED_REP_AND_PLATFORMS = "#{REDIS_MAIN_KEY}locked-repositories-and-platforms" + LOCKED_BUILD_LISTS = "#{REDIS_MAIN_KEY}locked-build-lists" + + def initialize + @redis = Resque.redis + @workers_count = APP_CONFIG['abf_worker']['publish_workers_count'] + end + + def run + available_repos = BuildList. + select('MIN(updated_at) as min_updated_at, save_to_repository_id, build_for_platform_id'). + where(:new_core => true, :status => BuildList::BUILD_PUBLISH). + group(:save_to_repository_id, :build_for_platform_id). + order(:min_updated_at). + limit(@workers_count * 2) # because some repos may be locked + + counter = 1 + + # looks like: + # ['save_to_repository_id-build_for_platform_id', ...] + locked_rep_and_pl = @redis.lrange(LOCKED_REP_AND_PLATFORMS, 0, -1) + available_repos.each do |el| + key = "#{el.save_to_repository_id}-#{el.build_for_platform_id}" + next if locked_rep_and_pl.include?(key) + break if counter > @workers_count + if create_task(el.save_to_repository_id, el.build_for_platform_id) + @redis.lpush(LOCKED_REP_AND_PLATFORMS, key) + counter += 1 + end + end + end + + private + + def create_task(save_to_repository_id, build_for_platform_id) + build_lists = BuildList. + where(:new_core => true, :status => BuildList::BUILD_PUBLISH). + where(:save_to_repository_id => save_to_repository_id). + where(:build_for_platform_id => build_for_platform_id). + where('id NOT IN (?)', @redis.lrange(LOCKED_BUILD_LISTS, 0, -1)) + + bl = build_lists.first + return false unless bl + + type = bl.build_for_platform.distrib_type + platform_path = "#{bl.save_to_platform.path}/repository" + if bl.save_to_platform.personal? + platform_path << '/' << bl.build_for_platform.name + Dir.mkdir(platform_path) unless File.exists?(platform_path) + end + worker_queue = bl.worker_queue_with_priority("publish_worker") + worker_class = bl.worker_queue_class("AbfWorker::PublishWorker") + + options = { + :id => bl.id, + :arch => bl.arch.name, + :distrib_type => bl.build_for_platform.distrib_type, + :platform => { + :platform_path => platform_path, + :released => bl.save_to_platform.released + }, + :repository => { + :name => bl.save_to_repository.name, + :id => bl.save_to_repository.id + }, + :type => :publish, + :time_living => 2400 # 40 min + } + + packages = {:sources => [], :binaries => {:x86_64 => [], :i586 => []}} + old_packages = packages.clone + + build_lists.each do |bl| + fill_packages(bl, packages) + bl.last_published.includes(:packages).limit(5).each{ |old_bl| + fill_packages(old_bl, old_packages) + } + @redis.lpush(LOCKED_BUILD_LISTS, bl.id) + end + options.merge!({:packages => packages, :old_packages => old_packages}) + + Resque.push( + worker_queue, + 'class' => worker_class, + 'args' => [options] + ) + return true + end + + def fill_packages(bl, results_map) + # TODO: remove duplicates of sources for different arches + results_map[:sources] |= bl.packages.by_package_type('source').pluck(:sha1) + results_map[:binaries][bl.arch.name.to_sym] |= bl.packages.by_package_type('binary').pluck(:sha1) + end + + end +end \ No newline at end of file diff --git a/lib/abf_worker/model_helper.rb b/lib/abf_worker/model_helper.rb index 2d240f01f..9d8de6653 100644 --- a/lib/abf_worker/model_helper.rb +++ b/lib/abf_worker/model_helper.rb @@ -30,8 +30,6 @@ module AbfWorker true end - protected - def worker_queue_with_priority(queue = nil) queue ||= abf_worker_base_queue queue << '_' << abf_worker_priority if abf_worker_priority.present?