#345: use Redis.current instead of all other

This commit is contained in:
Vokhmin Alexey V 2014-03-19 01:52:40 +04:00
parent cb8246d72f
commit ebe07ba00c
14 changed files with 73 additions and 67 deletions

View File

@ -58,7 +58,7 @@ class Api::V1::JobsController < Api::V1::BaseController
def status def status
if params[:key] =~ /\Aabfworker::(rpm|iso)-worker-[\d]+::live-inspector\z/ if params[:key] =~ /\Aabfworker::(rpm|iso)-worker-[\d]+::live-inspector\z/
status = Resque.redis.get(params[:key]) status = Redis.current.get(params[:key])
end end
render json: { status: status }.to_json render json: { status: status }.to_json
end end

View File

@ -2,13 +2,13 @@ class BuildListsQueuesMonitoringJob
@queue = :hook @queue = :hook
def self.perform def self.perform
redis.smembers('queues').each do |key| Redis.current.smembers('resque:queues').each do |key|
next if key !~ /(user|mass)_build_/ next if key !~ /(user|mass)_build_/
queue = "queue:#{key}" queue = "resque:queue:#{key}"
id = key.gsub(/[^\d]/, '') id = key.gsub(/[^\d]/, '')
if redis.llen(queue) == 0 if Redis.current.llen(queue) == 0
if key =~ /^user/ if key =~ /^user/
last_updated_at = BuildList.select(:updated_at). last_updated_at = BuildList.select(:updated_at).
where(user_id: id).order('updated_at DESC').first where(user_id: id).order('updated_at DESC').first
@ -21,29 +21,25 @@ class BuildListsQueuesMonitoringJob
else else
# ensures that user/mass-build in the set from which we select next jobs # ensures that user/mass-build in the set from which we select next jobs
set_key = key =~ /^user/ ? BuildList::USER_BUILDS_SET : BuildList::MASS_BUILDS_SET set_key = key =~ /^user/ ? BuildList::USER_BUILDS_SET : BuildList::MASS_BUILDS_SET
redis.sadd set_key, id Redis.current.sadd set_key, id
end end
end end
end end
def self.clean(key) def self.clean(key)
queue = "queue:#{key}" queue = "resque:queue:#{key}"
# See [#watch]: https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2012 # See [#watch]: https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2012
redis.watch(queue) do Redis.current.watch(queue) do
if redis.llen(queue) == 0 if Redis.current.llen(queue) == 0
redis.multi do |multi| Redis.current.multi do |multi|
multi.del queue multi.del queue
multi.srem 'queues', key multi.srem 'resque:queues', key
end end
else else
redis.unwatch Redis.current.unwatch
end end
end end
end end
def self.redis
@redis ||= Resque.redis
end
end end

View File

@ -534,17 +534,16 @@ class BuildList < ActiveRecord::Base
end end
def self.next_build def self.next_build
redis = Resque.redis kind_id = Redis.current.spop(USER_BUILDS_SET)
kind_id = redis.spop(USER_BUILDS_SET)
key = "user_build_#{kind_id}_rpm_worker_default" if kind_id key = "user_build_#{kind_id}_rpm_worker_default" if kind_id
task = Resque.pop(key) if key task = Resque.pop(key) if key
redis.sadd(USER_BUILDS_SET, kind_id) if task Redis.current.sadd(USER_BUILDS_SET, kind_id) if task
kind_id ||= redis.spop(MASS_BUILDS_SET) kind_id ||= Redis.current.spop(MASS_BUILDS_SET)
key ||= "mass_build_#{kind_id}_rpm_worker" if kind_id key ||= "mass_build_#{kind_id}_rpm_worker" if kind_id
task ||= Resque.pop(key) if key task ||= Resque.pop(key) if key
redis.sadd(MASS_BUILDS_SET, kind_id) if task && key =~ /^mass_build/ Redis.current.sadd(MASS_BUILDS_SET, kind_id) if task && key =~ /^mass_build/
if task if task
build_list = BuildList.where(id: task['args'][0]['id']).first build_list = BuildList.where(id: task['args'][0]['id']).first

View File

@ -52,5 +52,7 @@ module Rosa
# Version of your assets, change this if you want to expire all your assets # Version of your assets, change this if you want to expire all your assets
config.assets.version = '1.0' config.assets.version = '1.0'
config.log_redis = false
end end
end end

View File

@ -55,4 +55,5 @@ Rosa::Application.configure do
config.middleware.insert_before Rails::Rack::Logger, DisableAssetsLogger config.middleware.insert_before Rails::Rack::Logger, DisableAssetsLogger
config.eager_load = false config.eager_load = false
config.log_redis = true
end end

View File

@ -65,4 +65,5 @@ Rosa::Application.configure do
config.assets.precompile += %w(login.css login.js reg_session.css tour.css tour.js gollum/editor/langs/*.js moment/ru.js) config.assets.precompile += %w(login.css login.js reg_session.css tour.css tour.js gollum/editor/langs/*.js moment/ru.js)
config.eager_load = true config.eager_load = true
config.log_redis = true
end end

View File

@ -39,4 +39,5 @@ Rosa::Application.configure do
config.assets.allow_debugging = true config.assets.allow_debugging = true
config.eager_load = false config.eager_load = false
config.log_redis = true
end end

View File

@ -0,0 +1,11 @@
class Redis
def self.connect!
opts = { url: "redis://localhost:6379/#{::Rails.env.test? ? 1 : 0}" }
opts[:logger] = ::Rails.logger if ::Rails.application.config.log_redis
Redis.current = Redis.new(opts)
end
end
Redis.connect!

View File

@ -27,8 +27,8 @@ on_worker_boot do
Rails.logger.info('Connected to PG') Rails.logger.info('Connected to PG')
end end
# Redis.connect! Redis.connect!
# Rails.logger.info('Connected to Redis') Rails.logger.info('Connected to Redis')
end end
activate_control_app 'unix:///tmp/rosa_build_pumactl.sock' activate_control_app 'unix:///tmp/rosa_build_pumactl.sock'

View File

@ -12,7 +12,6 @@ module AbfWorker
end end
def initialize def initialize
@redis = Resque.redis
@workers_count = APP_CONFIG['abf_worker']['publish_workers_count'] @workers_count = APP_CONFIG['abf_worker']['publish_workers_count']
end end
@ -29,33 +28,33 @@ module AbfWorker
if repository.platform.personal? if repository.platform.personal?
Platform.main.each do |main_platform| Platform.main.each do |main_platform|
key = "#{project.id}-#{repository.id}-#{main_platform.id}" key = "#{project.id}-#{repository.id}-#{main_platform.id}"
redis.lpush PROJECTS_FOR_CLEANUP, key Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, main_platform.id gather_old_packages project.id, repository.id, main_platform.id
redis.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key) Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, main_platform.id, true gather_old_packages project.id, repository.id, main_platform.id, true
end end
else else
key = "#{project.id}-#{repository.id}-#{repository.platform.id}" key = "#{project.id}-#{repository.id}-#{repository.platform.id}"
redis.lpush PROJECTS_FOR_CLEANUP, key Redis.current.lpush PROJECTS_FOR_CLEANUP, key
gather_old_packages project.id, repository.id, repository.platform.id gather_old_packages project.id, repository.id, repository.platform.id
redis.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key) Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key)
gather_old_packages project.id, repository.id, repository.platform.id, true gather_old_packages project.id, repository.id, repository.platform.id, true
end end
end end
def cleanup_completed(projects_for_cleanup) def cleanup_completed(projects_for_cleanup)
projects_for_cleanup.each do |key| projects_for_cleanup.each do |key|
redis.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
redis.hdel PACKAGES_FOR_CLEANUP, key Redis.current.hdel PACKAGES_FOR_CLEANUP, key
end end
end end
def cleanup_failed(projects_for_cleanup) def cleanup_failed(projects_for_cleanup)
projects_for_cleanup.each do |key| projects_for_cleanup.each do |key|
redis.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key
redis.lpush PROJECTS_FOR_CLEANUP, key Redis.current.lpush PROJECTS_FOR_CLEANUP, key
end end
end end
@ -63,12 +62,12 @@ module AbfWorker
return if build_lists.blank? return if build_lists.blank?
rep_pl = "#{repository_id}-#{platform_id}" rep_pl = "#{repository_id}-#{platform_id}"
key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}" key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}"
redis.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl Redis.current.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl
redis.sadd key, build_lists Redis.current.sadd key, build_lists
end end
def unlock_build_list(build_list) def unlock_build_list(build_list)
redis.lrem LOCKED_BUILD_LISTS, 0, build_list.id Redis.current.lrem LOCKED_BUILD_LISTS, 0, build_list.id
end end
def packages_structure def packages_structure
@ -77,10 +76,6 @@ module AbfWorker
structure structure
end end
def redis
Resque.redis
end
def create_container_for(build_list) def create_container_for(build_list)
platform_path = "#{build_list.save_to_platform.path}/container/#{build_list.id}" platform_path = "#{build_list.save_to_platform.path}/container/#{build_list.id}"
system "rm -rf #{platform_path} && mkdir -p #{platform_path}" system "rm -rf #{platform_path} && mkdir -p #{platform_path}"
@ -146,7 +141,7 @@ module AbfWorker
} }
end end
key = (testing ? 'testing-' : '') << "#{project_id}-#{repository_id}-#{platform_id}" key = (testing ? 'testing-' : '') << "#{project_id}-#{repository_id}-#{platform_id}"
redis.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json Redis.current.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json
end end
def fill_packages(bl, results_map, field = :sha1) def fill_packages(bl, results_map, field = :sha1)
@ -216,14 +211,14 @@ module AbfWorker
where(platforms: {platform_type: 'main'}).pluck(:repository_id) where(platforms: {platform_type: 'main'}).pluck(:repository_id)
available_repos = available_repos.where('save_to_repository_id NOT IN (?)', locked_rep) unless locked_rep.empty? available_repos = available_repos.where('save_to_repository_id NOT IN (?)', locked_rep) unless locked_rep.empty?
for_cleanup = @redis.lrange(PROJECTS_FOR_CLEANUP, 0, -1).map do |key| for_cleanup = Redis.current.lrange(PROJECTS_FOR_CLEANUP, 0, -1).map do |key|
next if testing && key !~ /^testing-/ next if testing && key !~ /^testing-/
rep, pl = *key.split('-').last(2) rep, pl = *key.split('-').last(2)
locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i] locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i]
end.compact end.compact
for_cleanup_from_testing = @redis.smembers(REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING).map do |key| for_cleanup_from_testing = Redis.current.smembers(REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING).map do |key|
next if @redis.scard("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{key}") == 0 next if Redis.current.scard("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{key}") == 0
rep, pl = *key.split('-') rep, pl = *key.split('-')
locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i] locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i]
end.compact if testing end.compact if testing
@ -240,7 +235,7 @@ module AbfWorker
def create_rpm_build_task(save_to_repository_id, build_for_platform_id, testing) def create_rpm_build_task(save_to_repository_id, build_for_platform_id, testing)
key = "#{save_to_repository_id}-#{build_for_platform_id}" key = "#{save_to_repository_id}-#{build_for_platform_id}"
projects_for_cleanup = @redis.lrange(PROJECTS_FOR_CLEANUP, 0, -1).select do |k| projects_for_cleanup = Redis.current.lrange(PROJECTS_FOR_CLEANUP, 0, -1).select do |k|
(testing && k =~ /^testing-[\d]+-#{key}$/) || (!testing && k =~ /^[\d]+-#{key}$/) (testing && k =~ /^testing-[\d]+-#{key}$/) || (!testing && k =~ /^[\d]+-#{key}$/)
end end
@ -257,15 +252,15 @@ module AbfWorker
where(save_to_repository_id: save_to_repository_id). where(save_to_repository_id: save_to_repository_id).
where(build_for_platform_id: build_for_platform_id). where(build_for_platform_id: build_for_platform_id).
order(:updated_at) order(:updated_at)
locked_ids = @redis.lrange(LOCKED_BUILD_LISTS, 0, -1) locked_ids = Redis.current.lrange(LOCKED_BUILD_LISTS, 0, -1)
build_lists = build_lists.where('build_lists.id NOT IN (?)', locked_ids) unless locked_ids.empty? build_lists = build_lists.where('build_lists.id NOT IN (?)', locked_ids) unless locked_ids.empty?
build_lists = build_lists.limit(150) build_lists = build_lists.limit(150)
old_packages = self.class.packages_structure old_packages = self.class.packages_structure
projects_for_cleanup.each do |key| projects_for_cleanup.each do |key|
@redis.lrem PROJECTS_FOR_CLEANUP, 0, key Redis.current.lrem PROJECTS_FOR_CLEANUP, 0, key
packages = @redis.hget PACKAGES_FOR_CLEANUP, key packages = Redis.current.hget PACKAGES_FOR_CLEANUP, key
next unless packages next unless packages
packages = JSON.parse packages packages = JSON.parse packages
old_packages[:sources] |= packages['sources'] old_packages[:sources] |= packages['sources']
@ -275,7 +270,7 @@ module AbfWorker
end end
if testing if testing
build_lists_for_cleanup_from_testing = @redis.smembers("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{save_to_repository_id}-#{build_for_platform_id}") build_lists_for_cleanup_from_testing = Redis.current.smembers("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{save_to_repository_id}-#{build_for_platform_id}")
BuildList.where(id: build_lists_for_cleanup_from_testing).each do |b| BuildList.where(id: build_lists_for_cleanup_from_testing).each do |b|
self.class.fill_packages(b, old_packages, :fullname) self.class.fill_packages(b, old_packages, :fullname)
end if build_lists_for_cleanup_from_testing.present? end if build_lists_for_cleanup_from_testing.present?
@ -343,7 +338,7 @@ module AbfWorker
self.class.fill_packages(old_bl, old_packages, :fullname) self.class.fill_packages(old_bl, old_packages, :fullname)
} if testing } if testing
build_list_ids << bl.id build_list_ids << bl.id
@redis.lpush(LOCKED_BUILD_LISTS, bl.id) Redis.current.lpush(LOCKED_BUILD_LISTS, bl.id)
end end
packages[:sources] = new_sources.values.compact packages[:sources] = new_sources.values.compact
@ -359,16 +354,16 @@ module AbfWorker
) )
projects_for_cleanup.each do |key| projects_for_cleanup.each do |key|
@redis.lpush LOCKED_PROJECTS_FOR_CLEANUP, key Redis.current.lpush LOCKED_PROJECTS_FOR_CLEANUP, key
end end
rep_pl = "#{save_to_repository_id}-#{build_for_platform_id}" rep_pl = "#{save_to_repository_id}-#{build_for_platform_id}"
r_key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}" r_key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}"
build_lists_for_cleanup_from_testing.each do |key| build_lists_for_cleanup_from_testing.each do |key|
@redis.srem r_key, key Redis.current.srem r_key, key
end end
if @redis.scard(r_key) == 0 if Redis.current.scard(r_key) == 0
@redis.srem REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl Redis.current.srem REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl
end end
return true return true

View File

@ -39,7 +39,7 @@ module AbfWorker::ModelHelper
def restart_job def restart_job
update_build_sets update_build_sets
Resque.redis.lpush "queue:#{worker_queue_with_priority}", Redis.current.lpush "resque:queue:#{worker_queue_with_priority}",
Resque.encode({'class' => worker_queue_class, 'args' => [abf_worker_args]}) Resque.encode({'class' => worker_queue_class, 'args' => [abf_worker_args]})
end end
@ -87,15 +87,15 @@ module AbfWorker::ModelHelper
return unless is_a?(BuildList) return unless is_a?(BuildList)
key = mass_build_id ? MASS_BUILDS_SET : USER_BUILDS_SET key = mass_build_id ? MASS_BUILDS_SET : USER_BUILDS_SET
Resque.redis.pipelined do Redis.current.pipelined do
Resque.redis.sadd key, mass_build_id || user_id Redis.current.sadd key, mass_build_id || user_id
Resque.redis.sadd 'queues', worker_queue_with_priority Redis.current.sadd 'resque:queues', worker_queue_with_priority
end end
end end
def send_stop_signal def send_stop_signal
Resque.redis.setex( Redis.current.setex(
"#{service_queue}::live-inspector", "#{service_queue}::live-inspector",
240, # Data will be removed from Redis after 240 sec. 240, # Data will be removed from Redis after 240 sec.
'USR1' # Immediately kill child but don't exit 'USR1' # Immediately kill child but don't exit

View File

@ -57,11 +57,10 @@ module AbfWorker
def restart_task def restart_task
return false if status != FAILED return false if status != FAILED
redis = Resque.redis if Redis.current.lrem(RESTARTED_BUILD_LISTS, 0, subject.id) > 0 || (options['results'] || []).size > 1
if redis.lrem(RESTARTED_BUILD_LISTS, 0, subject.id) > 0 || (options['results'] || []).size > 1
return false return false
else else
redis.lpush RESTARTED_BUILD_LISTS, subject.id Redis.current.lpush RESTARTED_BUILD_LISTS, subject.id
subject.update_column(:status, BuildList::BUILD_PENDING) subject.update_column(:status, BuildList::BUILD_PENDING)
subject.restart_job if subject.external_nodes.blank? subject.restart_job if subject.external_nodes.blank?
return true return true

View File

@ -21,9 +21,9 @@ module AbfWorker
end end
def count_of_tasks(regexp) def count_of_tasks(regexp)
Resque.redis.smembers('queues'). Redis.current.smembers('resque:queues').
select{ |q| q =~ /#{regexp}/ }. select{ |q| q =~ /#{regexp}/ }.
map{ |q| Resque.redis.llen("queue:#{q}") }.sum map{ |q| Redis.current.llen("resque:queue:#{q}") }.sum
end end
def products_status def products_status
@ -45,8 +45,8 @@ module AbfWorker
end end
def status_of_worker(workers, worker) def status_of_worker(workers, worker)
redis, key = Resque.redis, "queue:#{worker}_worker" key = "resque:queue:#{worker}_worker"
default_tasks, tasks = redis.llen("#{key}_default"), redis.llen(key) default_tasks, tasks = Redis.current.llen("#{key}_default"), Redis.current.llen(key)
{ {
workers: workers.count, workers: workers.count,
build_tasks: workers.select{ |w| w.working? }.count, build_tasks: workers.select{ |w| w.working? }.count,

View File

@ -65,8 +65,9 @@ end
def stub_redis def stub_redis
@redis_instance = MockRedis.new @redis_instance = MockRedis.new
Redis.stub(:new).and_return { @redis_instance } allow(Redis).to receive(:new).and_return(@redis_instance)
Redis::Store.stub(:new).and_return { @redis_instance } allow(Redis).to receive(:current).and_return(@redis_instance)
allow(Redis::Store).to receive(:new).and_return(@redis_instance)
end end
def fill_project project def fill_project project