Added lock to job shifting

This commit is contained in:
Wedge 2016-03-22 16:12:27 +03:00
parent ac379fce04
commit 9c47ba64ba
9 changed files with 33 additions and 268 deletions

View File

@ -8,27 +8,28 @@ class Api::V1::JobsController < Api::V1::BaseController
skip_after_action :verify_authorized
def shift
job_shift_sem = Redis::Semaphore.new(:job_shift_lock)
job_shift_sem.lock
uid = BuildList.scoped_to_arch(arch_ids).
for_status([BuildList::BUILD_PENDING, BuildList::RERUN_TESTS]).
for_platform(platform_ids).pluck('DISTINCT user_id').sample
for_platform(platform_ids).where(builder: nil).pluck('DISTINCT user_id').sample
if uid
build_lists = BuildList.scoped_to_arch(arch_ids).
for_status([BuildList::BUILD_PENDING, BuildList::RERUN_TESTS]).
for_platform(platform_ids).where(user_id: uid).oldest.order(:created_at)
for_platform(platform_ids).where(user_id: uid).where(builder: nil).oldest.order(:created_at)
ActiveRecord::Base.transaction do
if current_user.system?
@build_list = build_lists.where(external_nodes: ["", nil]).first
@build_list ||= build_lists.external_nodes(:everything).first
else
@build_list = build_lists.external_nodes(:owned).for_user(current_user).first
@build_list ||= BuildListPolicy::Scope.new(current_user, build_lists).owned.
external_nodes(:everything).readonly(false).first
end
set_builder
if current_user.system?
@build_list = build_lists.where(external_nodes: ["", nil]).first
@build_list ||= build_lists.external_nodes(:everything).first
else
@build_list = build_lists.external_nodes(:owned).for_user(current_user).first
@build_list ||= BuildListPolicy::Scope.new(current_user, build_lists).owned.
external_nodes(:everything).readonly(false).first
end
set_builder
end
job_shift_sem.unlock
job = {
worker_queue: @build_list.worker_queue_with_priority(false),

View File

@ -0,0 +1,12 @@
module BuildLists
class ClearBuilderOnStaleBuildListsJob
@queue = :low
def self.perform
BuildList.where(["updated_at < ?", 120.seconds.ago]).where(status: BuildList::BUILD_PENDING).find_each(batch_size: 50) do |bl|
bl.builder = nil
bl.save
end
end
end
end

View File

@ -1,47 +0,0 @@
module BuildLists
class QueuesMonitoringJob
@queue = :middle
def self.perform
Redis.current.smembers('resque:queues').each do |key|
next if key !~ /(user|mass)_build_/
queue = "resque:queue:#{key}"
id = key.gsub(/[^\d]/, '')
if Redis.current.llen(queue) == 0
if key =~ /^user/
last_updated_at = BuildList.select(:updated_at).
where(user_id: id).order('updated_at DESC').first
else
last_updated_at = MassBuild.select(:updated_at).where(id: 250).first
end
last_updated_at = last_updated_at.try(:updated_at)
# cleans queue if no activity and tasks for this queue
clean(key) if !last_updated_at || (last_updated_at + 5.minutes) < Time.zone.now
else
# ensures that user/mass-build in the set from which we select next jobs
set_key = key =~ /^user/ ? BuildList::USER_BUILDS_SET : BuildList::MASS_BUILDS_SET
Redis.current.sadd set_key, id
end
end
end
def self.clean(key)
queue = "resque:queue:#{key}"
# See [#watch]: https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2012
Redis.current.watch(queue) do
if Redis.current.llen(queue) == 0
Redis.current.multi do |multi|
multi.del queue
multi.srem 'resque:queues', key
end
else
Redis.current.unwatch
end
end
end
end
end

View File

@ -1,11 +0,0 @@
class RestartNodesJob
@queue = :low
def self.perform
return if NodeInstruction.all_locked?
available_nodes = RpmBuildNode.all.map{ |n| n.user_id if n.user.try(:system?) }.compact.uniq
NodeInstruction.where(status: NodeInstruction::READY).
where.not(user_id: available_nodes).find_each(&:restart)
end
end

View File

@ -1,139 +0,0 @@
require 'cape'
require 'capistrano_colors'
set :default_environment, {
'LANG' => 'en_US.UTF-8'
}
require 'rvm/capistrano'
require 'bundler/capistrano'
require 'new_relic/recipes'
set :whenever_command, "bundle exec whenever"
require 'capistrano/ext/multistage'
# main details
ssh_options[:forward_agent] = true
default_run_options[:pty] = true
set :rvm_ruby_string, 'ruby-2.2.2@rosa_build'
set :application, "rosa_build"
set(:deploy_to) { "/srv/#{application}" }
set :user, "user"
set :use_sudo, false
set :keep_releases, 3
set :git_enable_submodules, 1
set :scm, :git
set :repository, "git@abf.rosalinux.ru:abf/rosa-build.git"
require './lib/recipes/nginx'
require 'puma/capistrano'
set :workers_count, 4
require './lib/recipes/resque'
require './lib/recipes/skype'
set :skype_topic, 'ABF' # Skype chat topic name
namespace :deploy do
task :symlink_all, :roles => :app do
run "mkdir -p #{fetch :shared_path}/config"
# Setup DB, application, newrelic
%w(database application newrelic).each do |config|
run "cp -n #{fetch :release_path}/config/#{config}.yml.sample #{fetch :shared_path}/config/#{config}.yml"
run "ln -nfs #{fetch :shared_path}/config/#{config}.yml #{fetch :release_path}/config/#{config}.yml"
end
# It will survive downloads folder between deployments
run "mkdir -p #{fetch :shared_path}/downloads"
run "ln -nfs #{fetch :shared_path}/downloads/ #{fetch :release_path}/public/downloads"
# It will survive sitemaps folder between deployments
run "mkdir -p #{fetch :shared_path}/sitemaps"
run "ln -nfs #{fetch :shared_path}/sitemaps #{fetch :release_path}/public/sitemaps"
end
task :symlink_pids, :roles => :app do
run "cd #{fetch :shared_path}/tmp && ln -nfs ../pids pids"
end
desc 'Copy compiled assets'
task :copy_assets, :roles => :app do
%w(new_application-*.css new_application-*.css.gz new_application-*.js new_application-*.js.gz).each do |f|
asset_glob = "#{fetch :shared_path}/assets/#{f}"
asset_file = capture %Q{ruby -e "print Dir.glob('#{asset_glob}').max_by { |file| File.mtime(file) }"}
puts asset_file
if asset_file
run "ln -fs #{asset_file} #{fetch :shared_path}/assets/#{ f.gsub('-*', '') }"
else
error "Error #{f} asset does not exist"
end
end
end
end
after "deploy:finalize_update", "deploy:symlink_all"
after "deploy:update_code", "deploy:migrate"
after "deploy:setup", "deploy:symlink_pids"
# Resque
after "deploy:stop", "resque:stop"
after "resque:stop", "resque:scheduler:stop"
after "deploy:start", "resque:start"
after "resque:start", "resque:scheduler:start"
after "deploy:restart", "resque:restart"
after "resque:restart", "resque:scheduler:restart"
after "deploy:restart", "deploy:cleanup"
after "deploy:cleanup", "deploy:copy_assets"
namespace :rake_tasks do
Cape do
mirror_rake_tasks 'db:seeds'
end
end
namespace :puma do
desc 'Restart puma'
task :restart, :roles => :app, :on_no_matching_servers => :continue do
begin
stop
rescue Capistrano::CommandError => ex
puts "Failed to restart puma: #{ex}\nAssuming not started."
ensure
start
end
end
end
namespace :update do
desc "Copy remote production shared files to localhost"
task :shared do
run_locally "rsync --recursive --times --rsh=ssh --compress --human-readable --progress #{user}@#{domain}:#{shared_path}/shared_contents/uploads public/uploads"
end
desc "Dump remote production postgresql database, rsync to localhost"
task :postgresql do
get("#{current_path}/config/database.yml", "tmp/database.yml")
remote_settings = YAML::load_file("tmp/database.yml")[rails_env]
local_settings = YAML::load_file("config/database.yml")["development"]
run "export PGPASSWORD=#{remote_settings["password"]} && pg_dump --host=#{remote_settings["host"]} --port=#{remote_settings["port"]} --username #{remote_settings["username"]} --file #{current_path}/tmp/#{remote_settings["database"]}_dump -Fc #{remote_settings["database"]}"
run_locally "rsync --recursive --times --rsh=ssh --compress --human-readable --progress #{user}@#{domain}:#{current_path}/tmp/#{remote_settings["database"]}_dump tmp/"
run_locally "dropdb -U #{local_settings["username"]} --host=#{local_settings["host"]} --port=#{local_settings["port"]} #{local_settings["database"]}"
run_locally "createdb -U #{local_settings["username"]} --host=#{local_settings["host"]} --port=#{local_settings["port"]} -T template0 #{local_settings["database"]}"
run_locally "pg_restore -U #{local_settings["username"]} --host=#{local_settings["host"]} --port=#{local_settings["port"]} -d #{local_settings["database"]} tmp/#{remote_settings["database"]}_dump"
end
desc "Dump all remote data to localhost"
task :all do
# update.shared
update.postgresql
end
end

View File

@ -1,10 +0,0 @@
require "whenever/capistrano"
set :rvm_ruby_string, 'ruby-2.2.2@rosa-build'
set :branch, "master"
set :domain, "0.0.0.0"
role :app, domain
role :web, domain
role :db, domain, primary: true

View File

@ -1,22 +0,0 @@
# Html5BoilerplateHelper will first check your ENV and
# then this yml file for these values. If they are both
# empty, the google code block will not be used.
#
# e.g. ENV['GOOGLE_ACCOUNT_ID'] || yml[:google_account_id]
# e.g. ENV['GOOGLE_API_KEY'] || yml[:google_api_key]
#
defaults: &defaults
:google_account_id: ''
:google_api_key: ''
:development:
<<: *defaults
:test:
<<: *defaults
:staging:
<<: *defaults
:production:
<<: *defaults

View File

@ -12,13 +12,6 @@ build_lists_publish_task_manager:
queue: middle
description: 'Creates tasks for publishing'
build_lists_queues_monitoring:
every:
- '1m'
class: 'BuildLists::QueuesMonitoringJob'
queue: middle
description: 'Monitoring for "user/mass-build" queues'
clean_api_defender_statistics:
every:
- '1d'
@ -38,4 +31,11 @@ run_extra_mass_builds:
- '5m'
class: 'RunExtraMassBuildsJob'
queue: low
description: 'Run mass builds with relations'
description: 'Run mass builds with relations'
clear_builder_on_stale_buildlists:
every:
- '2m'
class: 'BuildLists::ClearBuilderOnStaleBuildListsJob'
queue: low
description: 'Clear builder on build lists which are still pending'

View File

@ -1,19 +0,0 @@
require 'ffi'
module RPM
module C
extend ::FFI::Library
begin
ffi_lib ['librpm.so.3', 'librpm.so.2', 'librpm.so.1', 'librpm-5.4.so', 'rpm']
rescue LoadError => e
raise(
"Can't find rpm libs on your system: #{e.message}"
)
end
attach_function 'rpmEVRcmp', [:string, :string], :int
end
end