[galaxy-commits] galaxy-dist commit db1ae0bc8995: Attempt to reduce costly queries in the job runner, especially when tracking jobs in the database.

commits-noreply at bitbucket.org commits-noreply at bitbucket.org
Fri Jul 23 13:42:45 EDT 2010


# HG changeset patch -- Bitbucket.org
# Project galaxy-dist
# URL http://bitbucket.org/galaxy/galaxy-dist/overview
# User Nate Coraor <nate at bx.psu.edu>
# Date 1279818642 14400
# Node ID db1ae0bc8995cb30deef73ce9d14ef175c512f90
# Parent  79d38503db4d860f9c9d62560c156fb0a2122b1b
Attempt to reduce costly queries in the job runner, especially when tracking jobs in the database.

--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -18,7 +18,7 @@ from Queue import Queue, Empty
 log = logging.getLogger( __name__ )
 
 # States for running a job. These are NOT the same as data states
-JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_OK, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ok', 'ready', 'deleted', 'admin_deleted'
+JOB_WAIT, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted'
 
 # This file, if created in the job's working directory, will be used for
 # setting advanced metadata properties on the job and its associated outputs.
@@ -106,14 +106,14 @@ class JobQueue( object ):
         for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ):
             if job.tool_id not in self.app.toolbox.tools_by_id:
                 log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
-                JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed.  Please contact your Galaxy administrator, or' )
+                JobWrapper( job, self ).fail( 'This tool was disabled before the job completed.  Please contact your Galaxy administrator, or' )
             else:
                 log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id )
                 self.queue.put( ( job.id, job.tool_id ) )
         for job in self.sa_session.query( model.Job ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ):
             if job.tool_id not in self.app.toolbox.tools_by_id:
                 log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
-                JobWrapper( job, None, self ).fail( 'This tool was disabled before the job completed.  Please contact your Galaxy administrator, or' )
+                JobWrapper( job, self ).fail( 'This tool was disabled before the job completed.  Please contact your Galaxy administrator, or' )
             elif job.job_runner_name is None:
                 log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id )
                 if self.track_jobs_in_database:
@@ -121,7 +121,7 @@ class JobQueue( object ):
                 else:
                     self.queue.put( ( job.id, job.tool_id ) )
             else:
-                job_wrapper = JobWrapper( job, self.app.toolbox.tools_by_id[ job.tool_id ], self )
+                job_wrapper = JobWrapper( job, self )
                 self.dispatcher.recover( job, job_wrapper )
         if self.sa_session.dirty:
             self.sa_session.flush()
@@ -154,11 +154,12 @@ class JobQueue( object ):
         # Pull all new jobs from the queue at once
         new_jobs = []
         if self.track_jobs_in_database:
-            for j in self.sa_session.query( model.Job ) \
+            # Clear the session so we get fresh states for job and all datasets
+            self.sa_session.expunge_all()
+            # Fetch all new jobs
+            new_jobs = self.sa_session.query( model.Job ) \
                             .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \
-                            .filter( model.Job.state == model.Job.states.NEW ):
-                job = JobWrapper( j, self.app.toolbox.tools_by_id[ j.tool_id ], self )
-                new_jobs.append( job )
+                            .filter( model.Job.state == model.Job.states.NEW ).all()
         else:
             try:
                 while 1:
@@ -168,8 +169,7 @@ class JobQueue( object ):
                     # Unpack the message
                     job_id, tool_id = message
                     # Create a job wrapper from it
-                    job_entity = self.sa_session.query( model.Job ).get( job_id )
-                    job = JobWrapper( job_entity, self.app.toolbox.tools_by_id[ tool_id ], self )
+                    job = self.sa_session.query( model.Job ).get( job_id )
                     # Append to watch queue
                     new_jobs.append( job )
             except Empty:
@@ -179,52 +179,43 @@ class JobQueue( object ):
         new_waiting = []
         for job in ( new_jobs + self.waiting ):
             try:
-                # Clear the session for each job so we get fresh states for
-                # job and all datasets
-                self.sa_session.expunge_all()
-                # Get the real job entity corresponding to the wrapper (if we
-                # are tracking in the database this is probably cached in
-                # the session from the origianl query above)
-                job_entity = self.sa_session.query( model.Job ).get( job.job_id )
+                # Since we don't expunge when not tracking jobs in the
+                # database, refresh the job here so it's not stale.
+                if not self.track_jobs_in_database:
+                    self.sa_session.refresh( job )
                 # Check the job's dependencies, requeue if they're not done                    
-                job_state = self.__check_if_ready_to_run( job, job_entity )
-                if job_state == JOB_WAIT: 
+                job_state = self.__check_if_ready_to_run( job )
+                if job_state == JOB_WAIT:
                     if not self.track_jobs_in_database:
                         new_waiting.append( job )
-                elif job_state == JOB_ERROR:
-                    log.info( "job %d ended with an error" % job.job_id )
                 elif job_state == JOB_INPUT_ERROR:
-                    log.info( "job %d unable to run: one or more inputs in error state" % job.job_id )
+                    log.info( "job %d unable to run: one or more inputs in error state" % job.id )
                 elif job_state == JOB_INPUT_DELETED:
-                    log.info( "job %d unable to run: one or more inputs deleted" % job.job_id )
+                    log.info( "job %d unable to run: one or more inputs deleted" % job.id )
                 elif job_state == JOB_READY:
                     if self.job_lock:
-                        log.info("Job dispatch attempted for %s, but prevented by administrative lock." % job.job_id)
+                        log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id )
                         if not self.track_jobs_in_database:
                             new_waiting.append( job )
                     else:
-                        self.dispatcher.put( job )
-                        log.debug( "job %d dispatched" % job.job_id)
+                        self.dispatcher.put( JobWrapper( job, self ) )
+                        log.info( "job %d dispatched" % job.id )
                 elif job_state == JOB_DELETED:
-                    msg = "job %d deleted by user while still queued" % job.job_id
-                    job.info = msg
-                    log.debug( msg )
+                    log.info( "job %d deleted by user while still queued" % job.id )
                 elif job_state == JOB_ADMIN_DELETED:
-                    job.fail( job_entity.info )
-                    log.info( "job %d deleted by admin while still queued" % job.job_id )
+                    job.info( "job %d deleted by admin while still queued" % job.id )
                 else:
-                    msg = "unknown job state '%s' for job %d" % ( job_state, job.job_id )
-                    job.info = msg
-                    log.error( msg )
+                    log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) )
+                    if not self.track_jobs_in_database:
+                        new_waiting.append( job )
             except Exception, e:
-                job.info = "failure running job %d: %s" % ( job.job_id, str( e ) )
-                log.exception( "failure running job %d" % job.job_id )
+                log.exception( "failure running job %d" % job.id )
         # Update the waiting list
         self.waiting = new_waiting
         # Done with the session
         self.sa_session.remove()
         
-    def __check_if_ready_to_run( self, job_wrapper, job ):
+    def __check_if_ready_to_run( self, job ):
         """
         Check if a job is ready to run by verifying that each of its input
         datasets is ready (specifically in the OK state). If any input dataset
@@ -244,11 +235,11 @@ class JobQueue( object ):
                 continue
             # don't run jobs for which the input dataset was deleted
             if idata.deleted:
-                job_wrapper.fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+                JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
                 return JOB_INPUT_DELETED
             # an error in the input data causes us to bail immediately
             elif idata.state == idata.states.ERROR:
-                job_wrapper.fail( "input data %d is in error state" % ( idata.hid ) )
+                JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) )
                 return JOB_INPUT_ERROR
             elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
                 # need to requeue
@@ -280,11 +271,11 @@ class JobWrapper( object ):
     Wraps a 'model.Job' with convience methods for running processes and 
     state management.
     """
-    def __init__(self, job, tool, queue ):
+    def __init__( self, job, queue ):
         self.job_id = job.id
         self.session_id = job.session_id
         self.user_id = job.user_id
-        self.tool = tool
+        self.tool = queue.app.toolbox.tools_by_id.get( job.tool_id, None )
         self.queue = queue
         self.app = queue.app
         self.sa_session = self.app.model.context

--- a/lib/galaxy/tools/actions/__init__.py
+++ b/lib/galaxy/tools/actions/__init__.py
@@ -5,7 +5,6 @@ from galaxy.tools.parameters.grouping im
 from galaxy.util.template import fill_template
 from galaxy.util.none_like import NoneDataset
 from galaxy.web import url_for
-from galaxy.jobs import JOB_OK
 import galaxy.tools
 from types import *
 
@@ -353,7 +352,7 @@ class DefaultToolAction( object ):
             assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config."
             redirect_url += "&GALAXY_URL=%s" % GALAXY_URL
             # Job should not be queued, so set state to ok
-            job.state = JOB_OK
+            job.state = trans.app.model.Job.states.OK
             job.info = "Redirected to: %s" % redirect_url
             trans.sa_session.add( job )
             trans.sa_session.flush()


More information about the galaxy-commits mailing list