[galaxy-dev] [hg] galaxy 1731: Stop jobs if any of their output datasets grow...

Greg Von Kuster greg at bx.psu.edu
Thu Feb 5 16:52:08 EST 2009


details:   http://www.bx.psu.edu/hg/galaxy/rev/e26741c8c642
changeset: 1731:e26741c8c642
user:      Nate Coraor <nate at bx.psu.edu>
date:      Tue Feb 03 14:15:09 2009 -0500
description:
Stop jobs if any of their output datasets grow larger than a defined limit

5 file(s) affected in this change:

lib/galaxy/config.py
lib/galaxy/jobs/__init__.py
lib/galaxy/jobs/runners/local.py
lib/galaxy/jobs/runners/pbs.py
lib/galaxy/tools/__init__.py

diffs (309 lines):

diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/config.py
--- a/lib/galaxy/config.py	Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/config.py	Tue Feb 03 14:15:09 2009 -0500
@@ -47,6 +47,7 @@
         self.cluster_files_directory = os.path.abspath( kwargs.get( "cluster_files_directory", "database/pbs" ) )
         self.job_working_directory = resolve_path( kwargs.get( "job_working_directory", "database/job_working_directory" ), self.root )
         self.outputs_to_working_directory = string_as_bool( kwargs.get( 'outputs_to_working_directory', False ) )
+        self.output_size_limit = int( kwargs.get( 'output_size_limit', 0 ) )
         self.admin_pass = kwargs.get('admin_pass',"galaxy")
         self.sendmail_path = kwargs.get('sendmail_path',"/usr/sbin/sendmail")
         self.mailing_join_addr = kwargs.get('mailing_join_addr',"galaxy-user-join at bx.psu.edu")
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py	Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/jobs/__init__.py	Tue Feb 03 14:15:09 2009 -0500
@@ -305,6 +305,7 @@
         # and job recovery fail.
         self.working_directory = \
             os.path.join( self.app.config.job_working_directory, str( self.job_id ) )
+        self.output_paths = None
         
     def get_param_dict( self ):
         """
@@ -342,7 +343,7 @@
         incoming['userId'] = userId
         incoming['userEmail'] = userEmail
         # Build params, done before hook so hook can use
-        param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.working_directory )
+        param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames() )
         # Certain tools require tasks to be completed prior to job execution
         # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ).
         if self.tool.tool_type is not None:
@@ -371,7 +372,7 @@
         self.param_dict = param_dict
         self.extra_filenames = extra_filenames
         return extra_filenames
-        
+
     def fail( self, message, exception=False ):
         """
         Indicate job failure by setting state and message on all output 
@@ -385,14 +386,14 @@
             # Do this first in case we generate a traceback below
             if exception:
                 job.traceback = traceback.format_exc()
-            for dataset_assoc in job.output_datasets:
-                if self.app.config.outputs_to_working_directory:
-                    false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.dataset.id ) )
+            if self.app.config.outputs_to_working_directory:
+                for dataset_path in self.get_output_fnames():
                     try:
-                        shutil.move( false_path, dataset_assoc.dataset.file_name )
-                        log.debug( "fail(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) )
+                        shutil.move( dataset_path.false_path, dataset_path.real_path )
+                        log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
                     except ( IOError, OSError ), e:
                         log.error( "fail(): Missing output file in working directory: %s" % e )
+            for dataset_assoc in job.output_datasets:
                 dataset = dataset_assoc.dataset
                 dataset.refresh()
                 dataset.state = dataset.states.ERROR
@@ -452,15 +453,15 @@
             job.state = "error"
         else:
             job.state = 'ok'
+        if self.app.config.outputs_to_working_directory:
+            for dataset_path in self.get_output_fnames():
+                try:
+                    shutil.move( dataset_path.false_path, dataset_path.real_path )
+                    log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
+                except ( IOError, OSError ):
+                    self.fail( "Job %s's output dataset(s) could not be read" % job.id )
+                    return
         for dataset_assoc in job.output_datasets:
-            if self.app.config.outputs_to_working_directory:
-                false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.dataset.id ) )
-                try:
-                    shutil.move( false_path, dataset_assoc.dataset.file_name )
-                    log.debug( "finish(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) )
-                except ( IOError, OSError ):
-                    self.fail( "The job's output dataset(s) could not be read" )
-                    return
             for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running
                 dataset.blurb = 'done'
                 dataset.peek  = 'no peek'
@@ -519,7 +520,7 @@
             for fname in self.extra_filenames: 
                 os.remove( fname )
             if self.working_directory is not None:
-                os.rmdir( self.working_directory ) 
+                shutil.rmtree( self.working_directory )
         except:
             log.exception( "Unable to cleanup job %d" % self.job_id )
         
@@ -543,9 +544,36 @@
         return filenames
 
     def get_output_fnames( self ):
+        if self.output_paths is not None:
+            return self.output_paths
+
+        class DatasetPath( object ):
+            def __init__( self, real_path, false_path = None ):
+                self.real_path = real_path
+                self.false_path = false_path
+            def __str__( self ):
+                if false_path is None:
+                    return self.real_path
+                else:
+                    return self.false_path
+
         job = model.Job.get( self.job_id )
-        return [ da.dataset.file_name for da in job.output_datasets ]
+        if self.app.config.outputs_to_working_directory:
+            self.output_paths = []
+            for name, data in [ ( da.name, da.dataset ) for da in job.output_datasets ]:
+                false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) )
+                self.output_paths.append( DatasetPath( data.file_name, false_path ) )
+        else:
+            self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ]
+        return self.output_paths
 
+    def check_output_sizes( self ):
+        sizes = []
+        output_paths = self.get_output_fnames()
+        for outfile in [ str( o ) for o in output_paths ]:
+            sizes.append( ( outfile, os.stat( outfile ).st_size ) )
+        return sizes
+        
 class DefaultJobDispatcher( object ):
     def __init__( self, app ):
         self.app = app
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/runners/local.py
--- a/lib/galaxy/jobs/runners/local.py	Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/jobs/runners/local.py	Tue Feb 03 14:15:09 2009 -0500
@@ -4,6 +4,7 @@
 import threading
 
 from galaxy import model
+from galaxy.datatypes.data import nice_size
 
 import os, errno
 from time import sleep
@@ -68,9 +69,31 @@
                                          env = env,
                                          preexec_fn = os.setpgrp )
                 job_wrapper.set_runner( 'local:///', proc.pid )
+                if self.app.config.output_size_limit > 0:
+                    sleep_time = 1
+                    while proc.poll() is None:
+                        for outfile, size in job_wrapper.check_output_sizes():
+                            if size > self.app.config.output_size_limit:
+                                # Error the job immediately
+                                job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \
+                                    % nice_size( self.app.config.output_size_limit ) )
+                                log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \
+                                    % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) )
+                                # Then kill it
+                                os.killpg( proc.pid, 15 )
+                                sleep( 1 )
+                                if proc.poll() is None:
+                                    os.killpg( proc.pid, 9 )
+                                proc.wait() # reap
+                                log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) )
+                                return
+                            sleep( sleep_time )
+                            if sleep_time < 8:
+                                # So we don't stat every second
+                                sleep_time *= 2
                 stdout = proc.stdout.read() 
                 stderr = proc.stderr.read()
-                proc.wait()
+                proc.wait() # reap
                 log.debug('execution finished: %s' % command_line)
             except Exception, exc:
                 job_wrapper.fail( "failure running job", exception=True )
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/runners/pbs.py
--- a/lib/galaxy/jobs/runners/pbs.py	Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/jobs/runners/pbs.py	Tue Feb 03 14:15:09 2009 -0500
@@ -2,6 +2,8 @@
 from Queue import Queue, Empty
 
 from galaxy import model
+from galaxy.datatypes.data import nice_size
+
 from paste.deploy.converters import asbool
 
 import pkg_resources
@@ -60,6 +62,7 @@
         self.ofile = None
         self.efile = None
         self.runner_url = None
+        self.check_count = 0
 
 class PBSJobRunner( object ):
     """
@@ -71,6 +74,8 @@
         # Check if PBS was importable, fail if not
         if pbs is None:
             raise Exception( "PBSJobRunner requires pbs-python which was not found" )
+        if app.config.pbs_application_server and app.config.outputs_to_working_directory:
+            raise Exception( "pbs_application_server (file staging) and outputs_to_working_directory options are mutually exclusive" )
         self.app = app
         # 'watched' and 'queue' are both used to keep track of jobs to watch.
         # 'queue' is used to add new watched jobs, and can be called from
@@ -132,6 +137,8 @@
                     self.queue_job( obj )
                 elif op == 'finish':
                     self.finish_job( obj )
+                elif op == 'fail_oversize_job':
+                    self.fail_oversize_job( obj )
             except:
                 log.exception( "Uncaught exception %sing job" % op )
 
@@ -173,8 +180,9 @@
         if self.app.config.pbs_application_server:
             pbs_ofile = self.app.config.pbs_application_server + ':' + ofile
             pbs_efile = self.app.config.pbs_application_server + ':' + efile
-            stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + job_wrapper.get_output_fnames(), symlink=True )
-            stageout = self.get_stage_in_out( job_wrapper.get_output_fnames() )
+            output_files = [ str( o ) for o in job_wrapper.get_output_fnames() ]
+            stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True )
+            stageout = self.get_stage_in_out( output_files )
             job_attrs = pbs.new_attropl(5)
             job_attrs[0].name = pbs.ATTR_o
             job_attrs[0].value = pbs_ofile
@@ -298,6 +306,20 @@
                 if state == "R" and not pbs_job_state.running:
                     pbs_job_state.running = True
                     pbs_job_state.job_wrapper.change_state( "running" )
+                if self.app.config.output_size_limit > 0 and state == "R" and (pbs_job_state.check_count % 10) == 0:
+                    # Every 10th time a job is checked, check the size of its outputs.
+                    fail = False
+                    for outfile, size in pbs_job_state.job_wrapper.check_output_sizes():
+                        if size > self.app.config.output_size_limit:
+                            pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \
+                                % nice_size( self.app.config.output_size_limit )
+                            log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \
+                                % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) )
+                            self.work_queue.put( ( 'fail_oversize_job', pbs_job_state ) )
+                            fail = True
+                            break
+                    if fail:
+                        continue
                 pbs_job_state.old_state = state
                 new_watched.append( pbs_job_state )
             else:
@@ -329,6 +351,7 @@
             pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url )
             if pbs_server_name not in servers:
                 servers.append( pbs_server_name )
+            pbs_job_state.check_count += 1
         for pbs_server_name in servers:
             c = pbs.pbs_connect( pbs_server_name )
             if c <= 0:
@@ -386,6 +409,14 @@
         # clean up the pbs files
         self.cleanup( ( ofile, efile, job_file ) )
 
+    def fail_oversize_job( self, pbs_job_state ):
+        """
+        Seperated out so we can use the worker threads for it.
+        """
+        self.stop_job( self.app.model.Job.get( pbs_job_state.job_wrapper.job_id ) )
+        pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message )
+        self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) )
+
     def cleanup( self, files ):
         if not asbool( self.app.config.get( 'debug', False ) ):
             for file in files:
@@ -431,7 +462,7 @@
             return
         pbs.pbs_deljob( c, str( job.job_runner_external_id ), 'NULL' )
         pbs.pbs_disconnect( c )
-        log.debug( "(%s/%s) Removed from PBS queue at user's request" % ( job.id, job.job_runner_external_id ) )
+        log.debug( "(%s/%s) Removed from PBS queue before job completion" % ( job.id, job.job_runner_external_id ) )
 
     def recover( self, job, job_wrapper ):
         """Recovers jobs stuck in the queued/running state when Galaxy started"""
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/tools/__init__.py
--- a/lib/galaxy/tools/__init__.py	Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/tools/__init__.py	Tue Feb 03 14:15:09 2009 -0500
@@ -1045,7 +1045,7 @@
                     input.validate( value, None )
                     input_values[ input.name ] = value
     
-    def build_param_dict( self, incoming, input_datasets, output_datasets, working_directory = None ):
+    def build_param_dict( self, incoming, input_datasets, output_datasets, output_paths ):
         """
         Build the dictionary of parameters for substituting into the command
         line. Each value is wrapped in a `InputValueWrapper`, which allows
@@ -1098,10 +1098,14 @@
                     param_dict[ "_CHILD___%s___%s" % ( name, child.designation ) ] = DatasetFilenameWrapper( child )
         for name, data in output_datasets.items():
             # Write outputs to the working directory (for security purposes) if desired.
-            if self.app.config.outputs_to_working_directory and working_directory is not None:
-                false_path = os.path.abspath( os.path.join( working_directory, "galaxy_dataset_%d.dat" % data.dataset.id ) )
-                param_dict[name] = DatasetFilenameWrapper( data, false_path = false_path )
-                open( false_path, 'w' ).close()
+            if self.app.config.outputs_to_working_directory:
+                try:
+                    false_path = [ dp.false_path for dp in output_paths if dp.real_path == data.file_name ][0]
+                    param_dict[name] = DatasetFilenameWrapper( data, false_path = false_path )
+                    open( false_path, 'w' ).close()
+                except IndexError:
+                    log.warning( "Unable to determine alternate path for writing job outputs, outputs will be written to their real paths" )
+                    param_dict[name] = DatasetFilenameWrapper( data )
             else:
                 param_dict[name] = DatasetFilenameWrapper( data )
             # Provide access to a path to store additional files



More information about the galaxy-dev mailing list