[galaxy-commits] galaxy-dist commit db1ae0bc8995: Attempt to reduce costly queries in the job runner, especially when tracking jobs in the database.
commits-noreply at bitbucket.org
commits-noreply at bitbucket.org
Fri Jul 23 13:42:45 EDT 2010
# HG changeset patch -- Bitbucket.org
# Project galaxy-dist
# URL http://bitbucket.org/galaxy/galaxy-dist/overview
# User Nate Coraor <nate at bx.psu.edu>
# Date 1279818642 14400
# Node ID db1ae0bc8995cb30deef73ce9d14ef175c512f90
# Parent 79d38503db4d860f9c9d62560c156fb0a2122b1b
Attempt to reduce costly queries in the job runner, especially when tracking jobs in the database.
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -18,7 +18,7 @@ from Queue import Queue, Empty
log = logging.getLogger( __name__ )
# States for running a job. These are NOT the same as data states
-JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted', 'admin_deleted'
+JOB_WAIT, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted'
# This file, if created in the job's working directory, will be used for
# setting advanced metadata properties on the job and its associated outputs.
@@ -106,14 +106,14 @@ class JobQueue( object ):
for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ):
if job.tool_id not in self.app.toolbox.tools_by_id:
log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
+ JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
else:
log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id )
self.queue.put( ( job.id, job.tool_id ) )
for job in self.sa_session.query( model.Job ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ):
if job.tool_id not in self.app.toolbox.tools_by_id:
log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
+ JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
elif job.job_runner_name is None:
log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id )
if self.track_jobs_in_database:
@@ -121,7 +121,7 @@ class JobQueue( object ):
else:
self.queue.put( ( job.id, job.tool_id ) )
else:
- job_wrapper = JobWrapper( job, self.app.toolbox.tools_by_id[ job.tool_id ], self )
+ job_wrapper = JobWrapper( job, self )
self.dispatcher.recover( job, job_wrapper )
if self.sa_session.dirty:
self.sa_session.flush()
@@ -154,11 +154,12 @@ class JobQueue( object ):
# Pull all new jobs from the queue at once
new_jobs = []
if self.track_jobs_in_database:
- for j in self.sa_session.query( model.Job ) \
+ # Clear the session so we get fresh states for job and all datasets
+ self.sa_session.expunge_all()
+ # Fetch all new jobs
+ new_jobs = self.sa_session.query( model.Job ) \
.options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \
- .filter( model.Job.state == model.Job.states.NEW ):
- job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self )
- new_jobs.append( job )
+ .filter( model.Job.state == model.Job.states.NEW ).all()
else:
try:
while 1:
@@ -168,8 +169,7 @@ class JobQueue( object ):
# Unpack the message
job_id, tool_id = message
# Create a job wrapper from it
- job_entity = self.sa_session.query( model.Job ).get( job_id )
- job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self )
+ job = self.sa_session.query( model.Job ).get( job_id )
# Append to watch queue
new_jobs.append( job )
except Empty:
@@ -179,52 +179,43 @@ class JobQueue( object ):
new_waiting = []
for job in ( new_jobs + self.waiting ):
try:
- # Clear the session for each job so we get fresh states for
- # job and all datasets
- self.sa_session.expunge_all()
- # Get the real job entity corresponding to the wrapper (if we
- # are tracking in the database this is probably cached in
- # the session from the origianl query above)
- job_entity = self.sa_session.query( model.Job ).get( job.job_id )
+ # Since we don't expunge when not tracking jobs in the
+ # database, refresh the job here so it's not stale.
+ if not self.track_jobs_in_database:
+ self.sa_session.refresh( job )
# Check the job's dependencies, requeue if they're not done
- job_state = self.__check_if_ready_to_run( job, job_entity )
- if job_state == JOB_WAIT:
+ job_state = self.__check_if_ready_to_run( job )
+ if job_state == JOB_WAIT:
if not self.track_jobs_in_database:
new_waiting.append( job )
- elif job_state == JOB_ERROR:
- log.info( "job %d ended with an error" % job.job_id )
elif job_state == JOB_INPUT_ERROR:
- log.info( "job %d unable to run: one or more inputs in error state" % job.job_id )
+ log.info( "job %d unable to run: one or more inputs in error state" % job.id )
elif job_state == JOB_INPUT_DELETED:
- log.info( "job %d unable to run: one or more inputs deleted" % job.job_id )
+ log.info( "job %d unable to run: one or more inputs deleted" % job.id )
elif job_state == JOB_READY:
if self.job_lock:
- log.info("Job dispatch attempted for %s, but prevented by administrative lock." % job.job_id)
+ log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id )
if not self.track_jobs_in_database:
new_waiting.append( job )
else:
- self.dispatcher.put( job )
- log.debug( "job %d dispatched" % job.job_id)
+ self.dispatcher.put( JobWrapper( job, self ) )
+ log.info( "job %d dispatched" % job.id )
elif job_state == JOB_DELETED:
- msg = "job %d deleted by user while still queued" % job.job_id
- job.info = msg
- log.debug( msg )
+ log.info( "job %d deleted by user while still queued" % job.id )
elif job_state == JOB_ADMIN_DELETED:
- job.fail( job_entity.info )
- log.info( "job %d deleted by admin while still queued" % job.job_id )
+ job.info( "job %d deleted by admin while still queued" % job.id )
else:
- msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id )
- job.info = msg
- log.error( msg )
+ log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) )
+ if not self.track_jobs_in_database:
+ new_waiting.append( job )
except Exception, e:
- job.info = "failure running job %d: %s" % ( job.job_id, str( e ) )
- log.exception( "failure running job %d" % job.job_id )
+ log.exception( "failure running job %d" % job.id )
# Update the waiting list
self.waiting = new_waiting
# Done with the session
self.sa_session.remove()
- def __check_if_ready_to_run( self, job_wrapper, job ):
+ def __check_if_ready_to_run( self, job ):
"""
Check if a job is ready to run by verifying that each of its input
datasets is ready (specifically in the OK state). If any input dataset
@@ -244,11 +235,11 @@ class JobQueue( object ):
continue
# don't run jobs for which the input dataset was deleted
if idata.deleted:
- job_wrapper.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
return JOB_INPUT_DELETED
# an error in the input data causes us to bail immediately
elif idata.state == idata.states.ERROR:
- job_wrapper.fail( "input data %d is in error state" % ( idata.hid ) )
+ JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) )
return JOB_INPUT_ERROR
elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
# need to requeue
@@ -280,11 +271,11 @@ class JobWrapper( object ):
Wraps a 'model.Job' with convience methods for running processes and
state management.
"""
- def __init__(self, job, tool, queue ):
+ def __init__( self, job, queue ):
self.job_id = job.id
self.session_id = job.session_id
self.user_id = job.user_id
- self.tool = tool
+ self.tool = queue.app.toolbox.tools_by_id.get( job.tool_id, None )
self.queue = queue
self.app = queue.app
self.sa_session = self.app.model.context
--- a/lib/galaxy/tools/actions/__init__.py
+++ b/lib/galaxy/tools/actions/__init__.py
@@ -5,7 +5,6 @@ from galaxy.tools.parameters.grouping im
from galaxy.util.template import fill_template
from galaxy.util.none_like import NoneDataset
from galaxy.web import url_for
-from galaxy.jobs import JOB_OK
import galaxy.tools
from types import *
@@ -353,7 +352,7 @@ class DefaultToolAction( object ):
assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config."
redirect_url += "&GALAXY_URL=%s" % GALAXY_URL
# Job should not be queued, so set state to ok
- job.state = JOB_OK
+ job.state = trans.app.model.Job.states.OK
job.info = "Redirected to: %s" % redirect_url
trans.sa_session.add( job )
trans.sa_session.flush()
More information about the galaxy-commits
mailing list