aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Bugzilla/JobQueue.pm')
-rw-r--r--Bugzilla/JobQueue.pm210
1 files changed, 106 insertions, 104 deletions
diff --git a/Bugzilla/JobQueue.pm b/Bugzilla/JobQueue.pm
index 6ff85d84f..e48182007 100644
--- a/Bugzilla/JobQueue.pm
+++ b/Bugzilla/JobQueue.pm
@@ -21,153 +21,155 @@ use fields qw(_worker_pidfile);
# This maps job names for Bugzilla::JobQueue to the appropriate modules.
# If you add new types of jobs, you should add a mapping here.
-use constant JOB_MAP => {
- send_mail => 'Bugzilla::Job::Mailer',
- bug_mail => 'Bugzilla::Job::BugMail',
-};
+use constant JOB_MAP =>
+ {send_mail => 'Bugzilla::Job::Mailer', bug_mail => 'Bugzilla::Job::BugMail',};
# Without a driver cache TheSchwartz opens a new database connection
# for each email it sends. This cached connection doesn't persist
# across requests.
-use constant DRIVER_CACHE_TIME => 300; # 5 minutes
+use constant DRIVER_CACHE_TIME => 300; # 5 minutes
# To avoid memory leak/fragmentation, a worker process won't process more than
# MAX_MESSAGES messages.
use constant MAX_MESSAGES => 1000;
sub job_map {
- if (!defined(Bugzilla->request_cache->{job_map})) {
- my $job_map = JOB_MAP;
- Bugzilla::Hook::process('job_map', { job_map => $job_map });
- Bugzilla->request_cache->{job_map} = $job_map;
- }
-
- return Bugzilla->request_cache->{job_map};
+ if (!defined(Bugzilla->request_cache->{job_map})) {
+ my $job_map = JOB_MAP;
+ Bugzilla::Hook::process('job_map', {job_map => $job_map});
+ Bugzilla->request_cache->{job_map} = $job_map;
+ }
+
+ return Bugzilla->request_cache->{job_map};
}
sub new {
- my $class = shift;
-
- if (!Bugzilla->feature('jobqueue')) {
- ThrowUserError('feature_disabled', { feature => 'jobqueue' });
- }
-
- my $lc = Bugzilla->localconfig;
- # We need to use the main DB as TheSchwartz module is going
- # to write to it.
- my $self = $class->SUPER::new(
- databases => [{
- dsn => Bugzilla->dbh_main->{private_bz_dsn},
- user => $lc->{db_user},
- pass => $lc->{db_pass},
- prefix => 'ts_',
- }],
- driver_cache_expiration => DRIVER_CACHE_TIME,
- prioritize => 1,
- );
-
- return $self;
+ my $class = shift;
+
+ if (!Bugzilla->feature('jobqueue')) {
+ ThrowUserError('feature_disabled', {feature => 'jobqueue'});
+ }
+
+ my $lc = Bugzilla->localconfig;
+
+ # We need to use the main DB as TheSchwartz module is going
+ # to write to it.
+ my $self = $class->SUPER::new(
+ databases => [{
+ dsn => Bugzilla->dbh_main->{private_bz_dsn},
+ user => $lc->{db_user},
+ pass => $lc->{db_pass},
+ prefix => 'ts_',
+ }],
+ driver_cache_expiration => DRIVER_CACHE_TIME,
+ prioritize => 1,
+ );
+
+ return $self;
}
# A way to get access to the underlying databases directly.
sub bz_databases {
- my $self = shift;
- my @hashes = keys %{ $self->{databases} };
- return map { $self->driver_for($_) } @hashes;
+ my $self = shift;
+ my @hashes = keys %{$self->{databases}};
+ return map { $self->driver_for($_) } @hashes;
}
# inserts a job into the queue to be processed and returns immediately
sub insert {
- my $self = shift;
- my $job = shift;
-
- if (!ref($job)) {
- my $mapped_job = Bugzilla::JobQueue->job_map()->{$job};
- ThrowCodeError('jobqueue_no_job_mapping', { job => $job })
- if !$mapped_job;
-
- $job = new TheSchwartz::Job(
- funcname => $mapped_job,
- arg => $_[0],
- priority => $_[1] || 5
- );
- }
-
- my $retval = $self->SUPER::insert($job);
- # XXX Need to get an error message here if insert fails, but
- # I don't see any way to do that in TheSchwartz.
- ThrowCodeError('jobqueue_insert_failed', { job => $job, errmsg => $@ })
- if !$retval;
-
- return $retval;
+ my $self = shift;
+ my $job = shift;
+
+ if (!ref($job)) {
+ my $mapped_job = Bugzilla::JobQueue->job_map()->{$job};
+ ThrowCodeError('jobqueue_no_job_mapping', {job => $job}) if !$mapped_job;
+
+ $job = new TheSchwartz::Job(
+ funcname => $mapped_job,
+ arg => $_[0],
+ priority => $_[1] || 5
+ );
+ }
+
+ my $retval = $self->SUPER::insert($job);
+
+ # XXX Need to get an error message here if insert fails, but
+ # I don't see any way to do that in TheSchwartz.
+ ThrowCodeError('jobqueue_insert_failed', {job => $job, errmsg => $@})
+ if !$retval;
+
+ return $retval;
}
# To avoid memory leaks/fragmentation which tends to happen for long running
# perl processes; check for jobs, and spawn a new process to empty the queue.
sub subprocess_worker {
- my $self = shift;
-
- my $command = "$0 -d -p '" . $self->{_worker_pidfile} . "' onepass";
-
- while (1) {
- my $time = (time);
- my @jobs = $self->list_jobs({
- funcname => $self->{all_abilities},
- run_after => $time,
- grabbed_until => $time,
- limit => 1,
- });
- if (@jobs) {
- $self->debug("Spawning queue worker process");
- # Run the worker as a daemon
- system $command;
- # And poll the PID to detect when the working has finished.
- # We do this instead of system() to allow for the INT signal to
- # interrup us and trigger kill_worker().
- my $pid = read_text($self->{_worker_pidfile}, err_mode => 'quiet');
- if ($pid) {
- sleep(3) while(kill(0, $pid));
- }
- $self->debug("Queue worker process completed");
- } else {
- $self->debug("No jobs found");
- }
- sleep(5);
+ my $self = shift;
+
+ my $command = "$0 -d -p '" . $self->{_worker_pidfile} . "' onepass";
+
+ while (1) {
+ my $time = (time);
+ my @jobs = $self->list_jobs({
+ funcname => $self->{all_abilities},
+ run_after => $time,
+ grabbed_until => $time,
+ limit => 1,
+ });
+ if (@jobs) {
+ $self->debug("Spawning queue worker process");
+
+ # Run the worker as a daemon
+ system $command;
+
+ # And poll the PID to detect when the working has finished.
+ # We do this instead of system() to allow for the INT signal to
+ # interrup us and trigger kill_worker().
+ my $pid = read_text($self->{_worker_pidfile}, err_mode => 'quiet');
+ if ($pid) {
+ sleep(3) while (kill(0, $pid));
+ }
+ $self->debug("Queue worker process completed");
}
+ else {
+ $self->debug("No jobs found");
+ }
+ sleep(5);
+ }
}
sub kill_worker {
- my $self = Bugzilla->job_queue();
- if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) {
- my $worker_pid = read_text($self->{_worker_pidfile});
- if ($worker_pid && kill(0, $worker_pid)) {
- $self->debug("Stopping worker process");
- system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop";
- }
+ my $self = Bugzilla->job_queue();
+ if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) {
+ my $worker_pid = read_text($self->{_worker_pidfile});
+ if ($worker_pid && kill(0, $worker_pid)) {
+ $self->debug("Stopping worker process");
+ system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop";
}
+ }
}
sub set_pidfile {
- my ($self, $pidfile) = @_;
- $self->{_worker_pidfile} = bz_locations->{'datadir'} .
- '/worker-' . basename($pidfile);
+ my ($self, $pidfile) = @_;
+ $self->{_worker_pidfile}
+ = bz_locations->{'datadir'} . '/worker-' . basename($pidfile);
}
# Clear the request cache at the start of each run.
sub work_once {
- my $self = shift;
- Bugzilla->clear_request_cache();
- return $self->SUPER::work_once(@_);
+ my $self = shift;
+ Bugzilla->clear_request_cache();
+ return $self->SUPER::work_once(@_);
}
# Never process more than MAX_MESSAGES in one batch, to avoid memory
# leak/fragmentation issues.
sub work_until_done {
- my $self = shift;
- my $count = 0;
- while ($count++ < MAX_MESSAGES) {
- $self->work_once or last;
- }
+ my $self = shift;
+ my $count = 0;
+ while ($count++ < MAX_MESSAGES) {
+ $self->work_once or last;
+ }
}
1;