abf/abf-worker#16: update Redis server for live logs

This commit is contained in:
Vokhmin Alexey V 2013-07-30 16:39:43 +04:00
parent 327277c2e1
commit 30803aa963
3 changed files with 73 additions and 56 deletions

View File

@ -9,6 +9,9 @@ common: &common
- 127.0.0.100 - 127.0.0.100
abf_worker: abf_worker:
publish_workers_count: 2 publish_workers_count: 2
log_server:
host: 127.0.0.1
port: 6379
keys: keys:
key_pair_secret_key: 'key_pair_secret_key' key_pair_secret_key: 'key_pair_secret_key'
airbrake_api_key: 'airbrake_api_key' airbrake_api_key: 'airbrake_api_key'

View File

@ -9,6 +9,9 @@ common: &common
- 127.0.0.100 - 127.0.0.100
abf_worker: abf_worker:
publish_workers_count: 2 publish_workers_count: 2
log_server:
host: 127.0.0.1
port: 6379
keys: keys:
key_pair_secret_key: 'key_pair_secret_key' key_pair_secret_key: 'key_pair_secret_key'
airbrake_api_key: 'airbrake_api_key' airbrake_api_key: 'airbrake_api_key'

View File

@ -1,59 +1,70 @@
module AbfWorker module AbfWorker::ModelHelper
module ModelHelper # In model which contains this helper should be:
# In model which contains this helper should be: # - #abf_worker_args
# - #abf_worker_args # - #build_canceled
# - #build_canceled
def abf_worker_log
Resque.redis.get(service_queue) || I18n.t('layout.build_lists.log.not_available')
end
def add_job_to_abf_worker_queue
Resque.push(
worker_queue_with_priority,
'class' => worker_queue_class,
'args' => [abf_worker_args]
)
end
def cancel_job
deleted = Resque::Job.destroy(
worker_queue_with_priority,
worker_queue_class,
abf_worker_args
)
if deleted == 1
build_canceled
else
send_stop_signal
end
true
end
def worker_queue_with_priority(queue = nil)
queue ||= abf_worker_base_queue
queue << '_' << abf_worker_priority if abf_worker_priority.present?
queue
end
def worker_queue_class(queue_class = nil)
queue_class ||= "AbfWorker::#{abf_worker_base_queue.classify}"
queue_class << abf_worker_priority.capitalize
end
private
def send_stop_signal
Resque.redis.setex(
"#{service_queue}::live-inspector",
240, # Data will be removed from Redis after 240 sec.
'USR1' # Immediately kill child but don't exit
)
end
def service_queue
"abfworker::#{abf_worker_base_queue.gsub(/\_/, '-')}-#{id}"
end
def self.included(base)
base.extend(ClassMethods)
end end
module ClassMethods
def log_server
@log_server ||= Redis.new(
:host => APP_CONFIG['abf_worker']['log_server']['host'],
:port => APP_CONFIG['abf_worker']['log_server']['port']
)
end
end
def abf_worker_log
self.class.log_server.get(service_queue) || I18n.t('layout.build_lists.log.not_available')
end
def add_job_to_abf_worker_queue
Resque.push(
worker_queue_with_priority,
'class' => worker_queue_class,
'args' => [abf_worker_args]
)
end
def cancel_job
deleted = Resque::Job.destroy(
worker_queue_with_priority,
worker_queue_class,
abf_worker_args
)
if deleted == 1
build_canceled
else
send_stop_signal
end
true
end
def worker_queue_with_priority(queue = nil)
queue ||= abf_worker_base_queue
queue << '_' << abf_worker_priority if abf_worker_priority.present?
queue
end
def worker_queue_class(queue_class = nil)
queue_class ||= "AbfWorker::#{abf_worker_base_queue.classify}"
queue_class << abf_worker_priority.capitalize
end
private
def send_stop_signal
Resque.redis.setex(
"#{service_queue}::live-inspector",
240, # Data will be removed from Redis after 240 sec.
'USR1' # Immediately kill child but don't exit
)
end
def service_queue
"abfworker::#{abf_worker_base_queue.gsub(/\_/, '-')}-#{id}"
end
end end