From ebe07ba00ce289bed4e37b5bebf8a469d9bc58e1 Mon Sep 17 00:00:00 2001 From: Vokhmin Alexey V Date: Wed, 19 Mar 2014 01:52:40 +0400 Subject: [PATCH] #345: use Redis.current instead of all other --- app/controllers/api/v1/jobs_controller.rb | 2 +- app/jobs/build_lists_queues_monitoring_job.rb | 24 ++++---- app/models/build_list.rb | 9 ++- config/application.rb | 2 + config/environments/development.rb | 1 + config/environments/staging.rb | 1 + config/environments/test.rb | 1 + config/initializers/redis.rb | 11 ++++ config/puma/production.rb | 4 +- .../build_lists_publish_task_manager.rb | 57 +++++++++---------- lib/abf_worker/model_helper.rb | 10 ++-- lib/abf_worker/rpm_worker_observer.rb | 5 +- lib/abf_worker/status_inspector.rb | 8 +-- spec/spec_helper.rb | 5 +- 14 files changed, 73 insertions(+), 67 deletions(-) create mode 100644 config/initializers/redis.rb diff --git a/app/controllers/api/v1/jobs_controller.rb b/app/controllers/api/v1/jobs_controller.rb index 5b1ffb057..73af36564 100644 --- a/app/controllers/api/v1/jobs_controller.rb +++ b/app/controllers/api/v1/jobs_controller.rb @@ -58,7 +58,7 @@ class Api::V1::JobsController < Api::V1::BaseController def status if params[:key] =~ /\Aabfworker::(rpm|iso)-worker-[\d]+::live-inspector\z/ - status = Resque.redis.get(params[:key]) + status = Redis.current.get(params[:key]) end render json: { status: status }.to_json end diff --git a/app/jobs/build_lists_queues_monitoring_job.rb b/app/jobs/build_lists_queues_monitoring_job.rb index 7c10d4b2f..08b4db60a 100644 --- a/app/jobs/build_lists_queues_monitoring_job.rb +++ b/app/jobs/build_lists_queues_monitoring_job.rb @@ -2,13 +2,13 @@ class BuildListsQueuesMonitoringJob @queue = :hook def self.perform - redis.smembers('queues').each do |key| + Redis.current.smembers('resque:queues').each do |key| next if key !~ /(user|mass)_build_/ - queue = "queue:#{key}" + queue = "resque:queue:#{key}" id = key.gsub(/[^\d]/, '') - if redis.llen(queue) == 0 + if Redis.current.llen(queue) == 0 if key =~ /^user/ last_updated_at = BuildList.select(:updated_at). where(user_id: id).order('updated_at DESC').first @@ -21,29 +21,25 @@ class BuildListsQueuesMonitoringJob else # ensures that user/mass-build in the set from which we select next jobs set_key = key =~ /^user/ ? BuildList::USER_BUILDS_SET : BuildList::MASS_BUILDS_SET - redis.sadd set_key, id + Redis.current.sadd set_key, id end end end def self.clean(key) - queue = "queue:#{key}" + queue = "resque:queue:#{key}" # See [#watch]: https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2012 - redis.watch(queue) do - if redis.llen(queue) == 0 - redis.multi do |multi| + Redis.current.watch(queue) do + if Redis.current.llen(queue) == 0 + Redis.current.multi do |multi| multi.del queue - multi.srem 'queues', key + multi.srem 'resque:queues', key end else - redis.unwatch + Redis.current.unwatch end end end - def self.redis - @redis ||= Resque.redis - end - end diff --git a/app/models/build_list.rb b/app/models/build_list.rb index 260591c9f..57c027c5a 100644 --- a/app/models/build_list.rb +++ b/app/models/build_list.rb @@ -534,17 +534,16 @@ class BuildList < ActiveRecord::Base end def self.next_build - redis = Resque.redis - kind_id = redis.spop(USER_BUILDS_SET) + kind_id = Redis.current.spop(USER_BUILDS_SET) key = "user_build_#{kind_id}_rpm_worker_default" if kind_id task = Resque.pop(key) if key - redis.sadd(USER_BUILDS_SET, kind_id) if task + Redis.current.sadd(USER_BUILDS_SET, kind_id) if task - kind_id ||= redis.spop(MASS_BUILDS_SET) + kind_id ||= Redis.current.spop(MASS_BUILDS_SET) key ||= "mass_build_#{kind_id}_rpm_worker" if kind_id task ||= Resque.pop(key) if key - redis.sadd(MASS_BUILDS_SET, kind_id) if task && key =~ /^mass_build/ + Redis.current.sadd(MASS_BUILDS_SET, kind_id) if task && key =~ /^mass_build/ if task build_list = BuildList.where(id: task['args'][0]['id']).first diff --git a/config/application.rb b/config/application.rb index d91154867..8c0e2210a 100644 --- a/config/application.rb +++ b/config/application.rb @@ -52,5 +52,7 @@ module Rosa # Version of your assets, change this if you want to expire all your assets config.assets.version = '1.0' + + config.log_redis = false end end diff --git a/config/environments/development.rb b/config/environments/development.rb index 9c364ad9f..09476e464 100644 --- a/config/environments/development.rb +++ b/config/environments/development.rb @@ -55,4 +55,5 @@ Rosa::Application.configure do config.middleware.insert_before Rails::Rack::Logger, DisableAssetsLogger config.eager_load = false + config.log_redis = true end diff --git a/config/environments/staging.rb b/config/environments/staging.rb index b0711b5f3..f5b7d0681 100644 --- a/config/environments/staging.rb +++ b/config/environments/staging.rb @@ -65,4 +65,5 @@ Rosa::Application.configure do config.assets.precompile += %w(login.css login.js reg_session.css tour.css tour.js gollum/editor/langs/*.js moment/ru.js) config.eager_load = true + config.log_redis = true end diff --git a/config/environments/test.rb b/config/environments/test.rb index f5dae9550..0ad71405b 100644 --- a/config/environments/test.rb +++ b/config/environments/test.rb @@ -39,4 +39,5 @@ Rosa::Application.configure do config.assets.allow_debugging = true config.eager_load = false + config.log_redis = true end diff --git a/config/initializers/redis.rb b/config/initializers/redis.rb new file mode 100644 index 000000000..dc5b4736c --- /dev/null +++ b/config/initializers/redis.rb @@ -0,0 +1,11 @@ +class Redis + def self.connect! + opts = { url: "redis://localhost:6379/#{::Rails.env.test? ? 1 : 0}" } + + opts[:logger] = ::Rails.logger if ::Rails.application.config.log_redis + + Redis.current = Redis.new(opts) + end +end + +Redis.connect! \ No newline at end of file diff --git a/config/puma/production.rb b/config/puma/production.rb index 75d4ed9f1..8af785937 100644 --- a/config/puma/production.rb +++ b/config/puma/production.rb @@ -27,8 +27,8 @@ on_worker_boot do Rails.logger.info('Connected to PG') end - # Redis.connect! - # Rails.logger.info('Connected to Redis') + Redis.connect! + Rails.logger.info('Connected to Redis') end activate_control_app 'unix:///tmp/rosa_build_pumactl.sock' diff --git a/lib/abf_worker/build_lists_publish_task_manager.rb b/lib/abf_worker/build_lists_publish_task_manager.rb index 21884f005..95f4b1700 100644 --- a/lib/abf_worker/build_lists_publish_task_manager.rb +++ b/lib/abf_worker/build_lists_publish_task_manager.rb @@ -12,8 +12,7 @@ module AbfWorker end def initialize - @redis = Resque.redis - @workers_count = APP_CONFIG['abf_worker']['publish_workers_count'] + @workers_count = APP_CONFIG['abf_worker']['publish_workers_count'] end def run @@ -29,33 +28,33 @@ module AbfWorker if repository.platform.personal? Platform.main.each do |main_platform| key = "#{project.id}-#{repository.id}-#{main_platform.id}" - redis.lpush PROJECTS_FOR_CLEANUP, key + Redis.current.lpush PROJECTS_FOR_CLEANUP, key gather_old_packages project.id, repository.id, main_platform.id - redis.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key) + Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key) gather_old_packages project.id, repository.id, main_platform.id, true end else key = "#{project.id}-#{repository.id}-#{repository.platform.id}" - redis.lpush PROJECTS_FOR_CLEANUP, key + Redis.current.lpush PROJECTS_FOR_CLEANUP, key gather_old_packages project.id, repository.id, repository.platform.id - redis.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key) + Redis.current.lpush PROJECTS_FOR_CLEANUP, ('testing-' << key) gather_old_packages project.id, repository.id, repository.platform.id, true end end def cleanup_completed(projects_for_cleanup) projects_for_cleanup.each do |key| - redis.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key - redis.hdel PACKAGES_FOR_CLEANUP, key + Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key + Redis.current.hdel PACKAGES_FOR_CLEANUP, key end end def cleanup_failed(projects_for_cleanup) projects_for_cleanup.each do |key| - redis.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key - redis.lpush PROJECTS_FOR_CLEANUP, key + Redis.current.lrem LOCKED_PROJECTS_FOR_CLEANUP, 0, key + Redis.current.lpush PROJECTS_FOR_CLEANUP, key end end @@ -63,12 +62,12 @@ module AbfWorker return if build_lists.blank? rep_pl = "#{repository_id}-#{platform_id}" key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}" - redis.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl - redis.sadd key, build_lists + Redis.current.sadd REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl + Redis.current.sadd key, build_lists end def unlock_build_list(build_list) - redis.lrem LOCKED_BUILD_LISTS, 0, build_list.id + Redis.current.lrem LOCKED_BUILD_LISTS, 0, build_list.id end def packages_structure @@ -77,10 +76,6 @@ module AbfWorker structure end - def redis - Resque.redis - end - def create_container_for(build_list) platform_path = "#{build_list.save_to_platform.path}/container/#{build_list.id}" system "rm -rf #{platform_path} && mkdir -p #{platform_path}" @@ -146,7 +141,7 @@ module AbfWorker } end key = (testing ? 'testing-' : '') << "#{project_id}-#{repository_id}-#{platform_id}" - redis.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json + Redis.current.hset PACKAGES_FOR_CLEANUP, key, old_packages.to_json end def fill_packages(bl, results_map, field = :sha1) @@ -216,14 +211,14 @@ module AbfWorker where(platforms: {platform_type: 'main'}).pluck(:repository_id) available_repos = available_repos.where('save_to_repository_id NOT IN (?)', locked_rep) unless locked_rep.empty? - for_cleanup = @redis.lrange(PROJECTS_FOR_CLEANUP, 0, -1).map do |key| + for_cleanup = Redis.current.lrange(PROJECTS_FOR_CLEANUP, 0, -1).map do |key| next if testing && key !~ /^testing-/ rep, pl = *key.split('-').last(2) locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i] end.compact - for_cleanup_from_testing = @redis.smembers(REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING).map do |key| - next if @redis.scard("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{key}") == 0 + for_cleanup_from_testing = Redis.current.smembers(REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING).map do |key| + next if Redis.current.scard("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{key}") == 0 rep, pl = *key.split('-') locked_rep.present? && locked_rep.include?(rep.to_i) ? nil : [rep.to_i, pl.to_i] end.compact if testing @@ -240,7 +235,7 @@ module AbfWorker def create_rpm_build_task(save_to_repository_id, build_for_platform_id, testing) key = "#{save_to_repository_id}-#{build_for_platform_id}" - projects_for_cleanup = @redis.lrange(PROJECTS_FOR_CLEANUP, 0, -1).select do |k| + projects_for_cleanup = Redis.current.lrange(PROJECTS_FOR_CLEANUP, 0, -1).select do |k| (testing && k =~ /^testing-[\d]+-#{key}$/) || (!testing && k =~ /^[\d]+-#{key}$/) end @@ -257,15 +252,15 @@ module AbfWorker where(save_to_repository_id: save_to_repository_id). where(build_for_platform_id: build_for_platform_id). order(:updated_at) - locked_ids = @redis.lrange(LOCKED_BUILD_LISTS, 0, -1) + locked_ids = Redis.current.lrange(LOCKED_BUILD_LISTS, 0, -1) build_lists = build_lists.where('build_lists.id NOT IN (?)', locked_ids) unless locked_ids.empty? build_lists = build_lists.limit(150) old_packages = self.class.packages_structure projects_for_cleanup.each do |key| - @redis.lrem PROJECTS_FOR_CLEANUP, 0, key - packages = @redis.hget PACKAGES_FOR_CLEANUP, key + Redis.current.lrem PROJECTS_FOR_CLEANUP, 0, key + packages = Redis.current.hget PACKAGES_FOR_CLEANUP, key next unless packages packages = JSON.parse packages old_packages[:sources] |= packages['sources'] @@ -275,7 +270,7 @@ module AbfWorker end if testing - build_lists_for_cleanup_from_testing = @redis.smembers("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{save_to_repository_id}-#{build_for_platform_id}") + build_lists_for_cleanup_from_testing = Redis.current.smembers("#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{save_to_repository_id}-#{build_for_platform_id}") BuildList.where(id: build_lists_for_cleanup_from_testing).each do |b| self.class.fill_packages(b, old_packages, :fullname) end if build_lists_for_cleanup_from_testing.present? @@ -343,7 +338,7 @@ module AbfWorker self.class.fill_packages(old_bl, old_packages, :fullname) } if testing build_list_ids << bl.id - @redis.lpush(LOCKED_BUILD_LISTS, bl.id) + Redis.current.lpush(LOCKED_BUILD_LISTS, bl.id) end packages[:sources] = new_sources.values.compact @@ -359,16 +354,16 @@ module AbfWorker ) projects_for_cleanup.each do |key| - @redis.lpush LOCKED_PROJECTS_FOR_CLEANUP, key + Redis.current.lpush LOCKED_PROJECTS_FOR_CLEANUP, key end rep_pl = "#{save_to_repository_id}-#{build_for_platform_id}" r_key = "#{BUILD_LISTS_FOR_CLEANUP_FROM_TESTING}-#{rep_pl}" build_lists_for_cleanup_from_testing.each do |key| - @redis.srem r_key, key + Redis.current.srem r_key, key end - if @redis.scard(r_key) == 0 - @redis.srem REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl + if Redis.current.scard(r_key) == 0 + Redis.current.srem REP_AND_PLS_OF_BUILD_LISTS_FOR_CLEANUP_FROM_TESTING, rep_pl end return true diff --git a/lib/abf_worker/model_helper.rb b/lib/abf_worker/model_helper.rb index 98d19e93d..2aba51664 100644 --- a/lib/abf_worker/model_helper.rb +++ b/lib/abf_worker/model_helper.rb @@ -39,7 +39,7 @@ module AbfWorker::ModelHelper def restart_job update_build_sets - Resque.redis.lpush "queue:#{worker_queue_with_priority}", + Redis.current.lpush "resque:queue:#{worker_queue_with_priority}", Resque.encode({'class' => worker_queue_class, 'args' => [abf_worker_args]}) end @@ -87,15 +87,15 @@ module AbfWorker::ModelHelper return unless is_a?(BuildList) key = mass_build_id ? MASS_BUILDS_SET : USER_BUILDS_SET - Resque.redis.pipelined do - Resque.redis.sadd key, mass_build_id || user_id - Resque.redis.sadd 'queues', worker_queue_with_priority + Redis.current.pipelined do + Redis.current.sadd key, mass_build_id || user_id + Redis.current.sadd 'resque:queues', worker_queue_with_priority end end def send_stop_signal - Resque.redis.setex( + Redis.current.setex( "#{service_queue}::live-inspector", 240, # Data will be removed from Redis after 240 sec. 'USR1' # Immediately kill child but don't exit diff --git a/lib/abf_worker/rpm_worker_observer.rb b/lib/abf_worker/rpm_worker_observer.rb index 34b3e7e15..0ca2f3105 100644 --- a/lib/abf_worker/rpm_worker_observer.rb +++ b/lib/abf_worker/rpm_worker_observer.rb @@ -57,11 +57,10 @@ module AbfWorker def restart_task return false if status != FAILED - redis = Resque.redis - if redis.lrem(RESTARTED_BUILD_LISTS, 0, subject.id) > 0 || (options['results'] || []).size > 1 + if Redis.current.lrem(RESTARTED_BUILD_LISTS, 0, subject.id) > 0 || (options['results'] || []).size > 1 return false else - redis.lpush RESTARTED_BUILD_LISTS, subject.id + Redis.current.lpush RESTARTED_BUILD_LISTS, subject.id subject.update_column(:status, BuildList::BUILD_PENDING) subject.restart_job if subject.external_nodes.blank? return true diff --git a/lib/abf_worker/status_inspector.rb b/lib/abf_worker/status_inspector.rb index d21c115c2..0106f3df3 100644 --- a/lib/abf_worker/status_inspector.rb +++ b/lib/abf_worker/status_inspector.rb @@ -21,9 +21,9 @@ module AbfWorker end def count_of_tasks(regexp) - Resque.redis.smembers('queues'). + Redis.current.smembers('resque:queues'). select{ |q| q =~ /#{regexp}/ }. - map{ |q| Resque.redis.llen("queue:#{q}") }.sum + map{ |q| Redis.current.llen("resque:queue:#{q}") }.sum end def products_status @@ -45,8 +45,8 @@ module AbfWorker end def status_of_worker(workers, worker) - redis, key = Resque.redis, "queue:#{worker}_worker" - default_tasks, tasks = redis.llen("#{key}_default"), redis.llen(key) + key = "resque:queue:#{worker}_worker" + default_tasks, tasks = Redis.current.llen("#{key}_default"), Redis.current.llen(key) { workers: workers.count, build_tasks: workers.select{ |w| w.working? }.count, diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 7514ace01..b53a44618 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -65,8 +65,9 @@ end def stub_redis @redis_instance = MockRedis.new - Redis.stub(:new).and_return { @redis_instance } - Redis::Store.stub(:new).and_return { @redis_instance } + allow(Redis).to receive(:new).and_return(@redis_instance) + allow(Redis).to receive(:current).and_return(@redis_instance) + allow(Redis::Store).to receive(:new).and_return(@redis_instance) end def fill_project project