#823: add BuildListsPublishTaskManager
This commit is contained in:
parent
9ea5f47e45
commit
81931be8cc
|
@ -151,8 +151,8 @@ class BuildList < ActiveRecord::Base
|
|||
after_transition :on => :published, :do => [:set_version_and_tag, :actualize_packages]
|
||||
after_transition :on => :cancel, :do => [:cancel_job],
|
||||
:if => lambda { |build_list| build_list.new_core? }
|
||||
after_transition :on => :publish, :do => [:publish_container],
|
||||
:if => lambda { |build_list| build_list.new_core? }
|
||||
# after_transition :on => :publish, :do => [:publish_container],
|
||||
# :if => lambda { |build_list| build_list.new_core? }
|
||||
|
||||
after_transition :on => [:published, :fail_publish, :build_error], :do => :notify_users
|
||||
after_transition :on => :build_success, :do => :notify_users,
|
||||
|
@ -271,46 +271,46 @@ class BuildList < ActiveRecord::Base
|
|||
can_publish? and not save_to_repository.publish_without_qa
|
||||
end
|
||||
|
||||
def publish_container
|
||||
type = build_for_platform.distrib_type
|
||||
archive = results.find{ |r| r['file_name'] =~ /.*\.tar\.gz$/ }
|
||||
# def publish_container
|
||||
# type = build_for_platform.distrib_type
|
||||
# archive = results.find{ |r| r['file_name'] =~ /.*\.tar\.gz$/ }
|
||||
|
||||
platform_path = "#{save_to_platform.path}/repository"
|
||||
if save_to_platform.personal?
|
||||
platform_path << '/'
|
||||
platform_path << build_for_platform.name
|
||||
Dir.mkdir(platform_path) unless File.exists?(platform_path)
|
||||
end
|
||||
# platform_path = "#{save_to_platform.path}/repository"
|
||||
# if save_to_platform.personal?
|
||||
# platform_path << '/'
|
||||
# platform_path << build_for_platform.name
|
||||
# Dir.mkdir(platform_path) unless File.exists?(platform_path)
|
||||
# end
|
||||
|
||||
packages = last_published.includes(:packages).limit(5).map{ |bl| bl.packages }.flatten
|
||||
sources = packages.map{ |p| p.fullname if p.package_type == 'source' }.compact
|
||||
binaries = packages.map{ |p| p.fullname if p.package_type == 'binary' }.compact
|
||||
# packages = last_published.includes(:packages).limit(5).map{ |bl| bl.packages }.flatten
|
||||
# sources = packages.map{ |p| p.fullname if p.package_type == 'source' }.compact
|
||||
# binaries = packages.map{ |p| p.fullname if p.package_type == 'binary' }.compact
|
||||
|
||||
Resque.push(
|
||||
worker_queue_with_priority("publish_#{type}_worker"),
|
||||
'class' => worker_queue_class("AbfWorker::Publish#{type.capitalize}Worker"),
|
||||
'args' => [{
|
||||
:id => id,
|
||||
:arch => arch.name,
|
||||
:distrib_type => type,
|
||||
:packages => {
|
||||
:sources => packages.by_package_type('source').pluck(:sha1),
|
||||
:binaries => packages.by_package_type('binary').pluck(:sha1)
|
||||
},
|
||||
:old_packages => { :sources => sources, :binaries => binaries },
|
||||
:platform => {
|
||||
:platform_path => platform_path,
|
||||
:released => save_to_platform.released
|
||||
},
|
||||
:repository => {
|
||||
:name => save_to_repository.name,
|
||||
:id => save_to_repository.id
|
||||
},
|
||||
:type => :publish,
|
||||
:time_living => 2400 # 40 min
|
||||
}]
|
||||
)
|
||||
end
|
||||
# Resque.push(
|
||||
# worker_queue_with_priority("publish_#{type}_worker"),
|
||||
# 'class' => worker_queue_class("AbfWorker::Publish#{type.capitalize}Worker"),
|
||||
# 'args' => [{
|
||||
# :id => id,
|
||||
# :arch => arch.name,
|
||||
# :distrib_type => type,
|
||||
# :packages => {
|
||||
# :sources => packages.by_package_type('source').pluck(:sha1),
|
||||
# :binaries => packages.by_package_type('binary').pluck(:sha1)
|
||||
# },
|
||||
# :old_packages => { :sources => sources, :binaries => binaries },
|
||||
# :platform => {
|
||||
# :platform_path => platform_path,
|
||||
# :released => save_to_platform.released
|
||||
# },
|
||||
# :repository => {
|
||||
# :name => save_to_repository.name,
|
||||
# :id => save_to_repository.id
|
||||
# },
|
||||
# :type => :publish,
|
||||
# :time_living => 2400 # 40 min
|
||||
# }]
|
||||
# )
|
||||
# end
|
||||
|
||||
def add_to_queue
|
||||
if new_core?
|
||||
|
@ -426,6 +426,15 @@ class BuildList < ActiveRecord::Base
|
|||
end
|
||||
end
|
||||
|
||||
def last_published
|
||||
BuildList.where(:project_id => self.project_id,
|
||||
:save_to_repository_id => self.save_to_repository_id)
|
||||
.for_platform(self.build_for_platform_id)
|
||||
.scoped_to_arch(self.arch_id)
|
||||
.for_status(BUILD_PUBLISHED)
|
||||
.recent
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def abf_worker_priority
|
||||
|
@ -511,12 +520,4 @@ class BuildList < ActiveRecord::Base
|
|||
end
|
||||
end
|
||||
|
||||
def last_published
|
||||
BuildList.where(:project_id => self.project_id,
|
||||
:save_to_repository_id => self.save_to_repository_id)
|
||||
.for_platform(self.build_for_platform_id)
|
||||
.scoped_to_arch(self.arch_id)
|
||||
.for_status(BUILD_PUBLISHED)
|
||||
.recent
|
||||
end
|
||||
end
|
||||
|
|
|
@ -22,3 +22,7 @@ end
|
|||
every 1.day, :at => '3:00 am' do
|
||||
rake "activity_feeds:clear", :output => 'log/activity_feeds.log'
|
||||
end
|
||||
|
||||
every 1.minute do
|
||||
runner 'AbfWorker::BuildListsPublishTaskManager.new.run'
|
||||
end
|
|
@ -0,0 +1,100 @@
|
|||
module AbfWorker
|
||||
class BuildListsPublishTaskManager
|
||||
REDIS_MAIN_KEY = 'abf-worker::build-lists-publish-task-manager::'
|
||||
LOCKED_REP_AND_PLATFORMS = "#{REDIS_MAIN_KEY}locked-repositories-and-platforms"
|
||||
LOCKED_BUILD_LISTS = "#{REDIS_MAIN_KEY}locked-build-lists"
|
||||
|
||||
def initialize
|
||||
@redis = Resque.redis
|
||||
@workers_count = APP_CONFIG['abf_worker']['publish_workers_count']
|
||||
end
|
||||
|
||||
def run
|
||||
available_repos = BuildList.
|
||||
select('MIN(updated_at) as min_updated_at, save_to_repository_id, build_for_platform_id').
|
||||
where(:new_core => true, :status => BuildList::BUILD_PUBLISH).
|
||||
group(:save_to_repository_id, :build_for_platform_id).
|
||||
order(:min_updated_at).
|
||||
limit(@workers_count * 2) # because some repos may be locked
|
||||
|
||||
counter = 1
|
||||
|
||||
# looks like:
|
||||
# ['save_to_repository_id-build_for_platform_id', ...]
|
||||
locked_rep_and_pl = @redis.lrange(LOCKED_REP_AND_PLATFORMS, 0, -1)
|
||||
available_repos.each do |el|
|
||||
key = "#{el.save_to_repository_id}-#{el.build_for_platform_id}"
|
||||
next if locked_rep_and_pl.include?(key)
|
||||
break if counter > @workers_count
|
||||
if create_task(el.save_to_repository_id, el.build_for_platform_id)
|
||||
@redis.lpush(LOCKED_REP_AND_PLATFORMS, key)
|
||||
counter += 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def create_task(save_to_repository_id, build_for_platform_id)
|
||||
build_lists = BuildList.
|
||||
where(:new_core => true, :status => BuildList::BUILD_PUBLISH).
|
||||
where(:save_to_repository_id => save_to_repository_id).
|
||||
where(:build_for_platform_id => build_for_platform_id).
|
||||
where('id NOT IN (?)', @redis.lrange(LOCKED_BUILD_LISTS, 0, -1))
|
||||
|
||||
bl = build_lists.first
|
||||
return false unless bl
|
||||
|
||||
type = bl.build_for_platform.distrib_type
|
||||
platform_path = "#{bl.save_to_platform.path}/repository"
|
||||
if bl.save_to_platform.personal?
|
||||
platform_path << '/' << bl.build_for_platform.name
|
||||
Dir.mkdir(platform_path) unless File.exists?(platform_path)
|
||||
end
|
||||
worker_queue = bl.worker_queue_with_priority("publish_worker")
|
||||
worker_class = bl.worker_queue_class("AbfWorker::PublishWorker")
|
||||
|
||||
options = {
|
||||
:id => bl.id,
|
||||
:arch => bl.arch.name,
|
||||
:distrib_type => bl.build_for_platform.distrib_type,
|
||||
:platform => {
|
||||
:platform_path => platform_path,
|
||||
:released => bl.save_to_platform.released
|
||||
},
|
||||
:repository => {
|
||||
:name => bl.save_to_repository.name,
|
||||
:id => bl.save_to_repository.id
|
||||
},
|
||||
:type => :publish,
|
||||
:time_living => 2400 # 40 min
|
||||
}
|
||||
|
||||
packages = {:sources => [], :binaries => {:x86_64 => [], :i586 => []}}
|
||||
old_packages = packages.clone
|
||||
|
||||
build_lists.each do |bl|
|
||||
fill_packages(bl, packages)
|
||||
bl.last_published.includes(:packages).limit(5).each{ |old_bl|
|
||||
fill_packages(old_bl, old_packages)
|
||||
}
|
||||
@redis.lpush(LOCKED_BUILD_LISTS, bl.id)
|
||||
end
|
||||
options.merge!({:packages => packages, :old_packages => old_packages})
|
||||
|
||||
Resque.push(
|
||||
worker_queue,
|
||||
'class' => worker_class,
|
||||
'args' => [options]
|
||||
)
|
||||
return true
|
||||
end
|
||||
|
||||
def fill_packages(bl, results_map)
|
||||
# TODO: remove duplicates of sources for different arches
|
||||
results_map[:sources] |= bl.packages.by_package_type('source').pluck(:sha1)
|
||||
results_map[:binaries][bl.arch.name.to_sym] |= bl.packages.by_package_type('binary').pluck(:sha1)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
|
@ -30,8 +30,6 @@ module AbfWorker
|
|||
true
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def worker_queue_with_priority(queue = nil)
|
||||
queue ||= abf_worker_base_queue
|
||||
queue << '_' << abf_worker_priority if abf_worker_priority.present?
|
||||
|
|
Loading…
Reference in New Issue