[galaxy-dev] [hg] galaxy 1731: Stop jobs if any of their output datasets grow...
Greg Von Kuster
greg at bx.psu.edu
Thu Feb 5 16:52:08 EST 2009
details: http://www.bx.psu.edu/hg/galaxy/rev/e26741c8c642
changeset: 1731:e26741c8c642
user: Nate Coraor <nate at bx.psu.edu>
date: Tue Feb 03 14:15:09 2009 -0500
description:
Stop jobs if any of their output datasets grow larger than a defined limit
5 file(s) affected in this change:
lib/galaxy/config.py
lib/galaxy/jobs/__init__.py
lib/galaxy/jobs/runners/local.py
lib/galaxy/jobs/runners/pbs.py
lib/galaxy/tools/__init__.py
diffs (309 lines):
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/config.py
--- a/lib/galaxy/config.py Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/config.py Tue Feb 03 14:15:09 2009 -0500
@@ -47,6 +47,7 @@
self.cluster_files_directory = os.path.abspath( kwargs.get( "cluster_files_directory", "database/pbs" ) )
self.job_working_directory = resolve_path( kwargs.get( "job_working_directory", "database/job_working_directory" ), self.root )
self.outputs_to_working_directory = string_as_bool( kwargs.get( 'outputs_to_working_directory', False ) )
+ self.output_size_limit = int( kwargs.get( 'output_size_limit', 0 ) )
self.admin_pass = kwargs.get('admin_pass',"galaxy")
self.sendmail_path = kwargs.get('sendmail_path',"/usr/sbin/sendmail")
self.mailing_join_addr = kwargs.get('mailing_join_addr',"galaxy-user-join at bx.psu.edu")
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/jobs/__init__.py Tue Feb 03 14:15:09 2009 -0500
@@ -305,6 +305,7 @@
# and job recovery fail.
self.working_directory = \
os.path.join( self.app.config.job_working_directory, str( self.job_id ) )
+ self.output_paths = None
def get_param_dict( self ):
"""
@@ -342,7 +343,7 @@
incoming['userId'] = userId
incoming['userEmail'] = userEmail
# Build params, done before hook so hook can use
- param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.working_directory )
+ param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames() )
# Certain tools require tasks to be completed prior to job execution
# ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ).
if self.tool.tool_type is not None:
@@ -371,7 +372,7 @@
self.param_dict = param_dict
self.extra_filenames = extra_filenames
return extra_filenames
-
+
def fail( self, message, exception=False ):
"""
Indicate job failure by setting state and message on all output
@@ -385,14 +386,14 @@
# Do this first in case we generate a traceback below
if exception:
job.traceback = traceback.format_exc()
- for dataset_assoc in job.output_datasets:
- if self.app.config.outputs_to_working_directory:
- false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.dataset.id ) )
+ if self.app.config.outputs_to_working_directory:
+ for dataset_path in self.get_output_fnames():
try:
- shutil.move( false_path, dataset_assoc.dataset.file_name )
- log.debug( "fail(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) )
+ shutil.move( dataset_path.false_path, dataset_path.real_path )
+ log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
except ( IOError, OSError ), e:
log.error( "fail(): Missing output file in working directory: %s" % e )
+ for dataset_assoc in job.output_datasets:
dataset = dataset_assoc.dataset
dataset.refresh()
dataset.state = dataset.states.ERROR
@@ -452,15 +453,15 @@
job.state = "error"
else:
job.state = 'ok'
+ if self.app.config.outputs_to_working_directory:
+ for dataset_path in self.get_output_fnames():
+ try:
+ shutil.move( dataset_path.false_path, dataset_path.real_path )
+ log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
+ except ( IOError, OSError ):
+ self.fail( "Job %s's output dataset(s) could not be read" % job.id )
+ return
for dataset_assoc in job.output_datasets:
- if self.app.config.outputs_to_working_directory:
- false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.dataset.id ) )
- try:
- shutil.move( false_path, dataset_assoc.dataset.file_name )
- log.debug( "finish(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) )
- except ( IOError, OSError ):
- self.fail( "The job's output dataset(s) could not be read" )
- return
for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running
dataset.blurb = 'done'
dataset.peek = 'no peek'
@@ -519,7 +520,7 @@
for fname in self.extra_filenames:
os.remove( fname )
if self.working_directory is not None:
- os.rmdir( self.working_directory )
+ shutil.rmtree( self.working_directory )
except:
log.exception( "Unable to cleanup job %d" % self.job_id )
@@ -543,9 +544,36 @@
return filenames
def get_output_fnames( self ):
+ if self.output_paths is not None:
+ return self.output_paths
+
+ class DatasetPath( object ):
+ def __init__( self, real_path, false_path = None ):
+ self.real_path = real_path
+ self.false_path = false_path
+ def __str__( self ):
+ if false_path is None:
+ return self.real_path
+ else:
+ return self.false_path
+
job = model.Job.get( self.job_id )
- return [ da.dataset.file_name for da in job.output_datasets ]
+ if self.app.config.outputs_to_working_directory:
+ self.output_paths = []
+ for name, data in [ ( da.name, da.dataset ) for da in job.output_datasets ]:
+ false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) )
+ self.output_paths.append( DatasetPath( data.file_name, false_path ) )
+ else:
+ self.output_paths = [ DatasetPath( da.dataset.file_name ) for da in job.output_datasets ]
+ return self.output_paths
+ def check_output_sizes( self ):
+ sizes = []
+ output_paths = self.get_output_fnames()
+ for outfile in [ str( o ) for o in output_paths ]:
+ sizes.append( ( outfile, os.stat( outfile ).st_size ) )
+ return sizes
+
class DefaultJobDispatcher( object ):
def __init__( self, app ):
self.app = app
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/runners/local.py
--- a/lib/galaxy/jobs/runners/local.py Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/jobs/runners/local.py Tue Feb 03 14:15:09 2009 -0500
@@ -4,6 +4,7 @@
import threading
from galaxy import model
+from galaxy.datatypes.data import nice_size
import os, errno
from time import sleep
@@ -68,9 +69,31 @@
env = env,
preexec_fn = os.setpgrp )
job_wrapper.set_runner( 'local:///', proc.pid )
+ if self.app.config.output_size_limit > 0:
+ sleep_time = 1
+ while proc.poll() is None:
+ for outfile, size in job_wrapper.check_output_sizes():
+ if size > self.app.config.output_size_limit:
+ # Error the job immediately
+ job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \
+ % nice_size( self.app.config.output_size_limit ) )
+ log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \
+ % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) )
+ # Then kill it
+ os.killpg( proc.pid, 15 )
+ sleep( 1 )
+ if proc.poll() is None:
+ os.killpg( proc.pid, 9 )
+ proc.wait() # reap
+ log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) )
+ return
+ sleep( sleep_time )
+ if sleep_time < 8:
+ # So we don't stat every second
+ sleep_time *= 2
stdout = proc.stdout.read()
stderr = proc.stderr.read()
- proc.wait()
+ proc.wait() # reap
log.debug('execution finished: %s' % command_line)
except Exception, exc:
job_wrapper.fail( "failure running job", exception=True )
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/jobs/runners/pbs.py
--- a/lib/galaxy/jobs/runners/pbs.py Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/jobs/runners/pbs.py Tue Feb 03 14:15:09 2009 -0500
@@ -2,6 +2,8 @@
from Queue import Queue, Empty
from galaxy import model
+from galaxy.datatypes.data import nice_size
+
from paste.deploy.converters import asbool
import pkg_resources
@@ -60,6 +62,7 @@
self.ofile = None
self.efile = None
self.runner_url = None
+ self.check_count = 0
class PBSJobRunner( object ):
"""
@@ -71,6 +74,8 @@
# Check if PBS was importable, fail if not
if pbs is None:
raise Exception( "PBSJobRunner requires pbs-python which was not found" )
+ if app.config.pbs_application_server and app.config.outputs_to_working_directory:
+ raise Exception( "pbs_application_server (file staging) and outputs_to_working_directory options are mutually exclusive" )
self.app = app
# 'watched' and 'queue' are both used to keep track of jobs to watch.
# 'queue' is used to add new watched jobs, and can be called from
@@ -132,6 +137,8 @@
self.queue_job( obj )
elif op == 'finish':
self.finish_job( obj )
+ elif op == 'fail_oversize_job':
+ self.fail_oversize_job( obj )
except:
log.exception( "Uncaught exception %sing job" % op )
@@ -173,8 +180,9 @@
if self.app.config.pbs_application_server:
pbs_ofile = self.app.config.pbs_application_server + ':' + ofile
pbs_efile = self.app.config.pbs_application_server + ':' + efile
- stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + job_wrapper.get_output_fnames(), symlink=True )
- stageout = self.get_stage_in_out( job_wrapper.get_output_fnames() )
+ output_files = [ str( o ) for o in job_wrapper.get_output_fnames() ]
+ stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True )
+ stageout = self.get_stage_in_out( output_files )
job_attrs = pbs.new_attropl(5)
job_attrs[0].name = pbs.ATTR_o
job_attrs[0].value = pbs_ofile
@@ -298,6 +306,20 @@
if state == "R" and not pbs_job_state.running:
pbs_job_state.running = True
pbs_job_state.job_wrapper.change_state( "running" )
+ if self.app.config.output_size_limit > 0 and state == "R" and (pbs_job_state.check_count % 10) == 0:
+ # Every 10th time a job is checked, check the size of its outputs.
+ fail = False
+ for outfile, size in pbs_job_state.job_wrapper.check_output_sizes():
+ if size > self.app.config.output_size_limit:
+ pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \
+ % nice_size( self.app.config.output_size_limit )
+ log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \
+ % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) )
+ self.work_queue.put( ( 'fail_oversize_job', pbs_job_state ) )
+ fail = True
+ break
+ if fail:
+ continue
pbs_job_state.old_state = state
new_watched.append( pbs_job_state )
else:
@@ -329,6 +351,7 @@
pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url )
if pbs_server_name not in servers:
servers.append( pbs_server_name )
+ pbs_job_state.check_count += 1
for pbs_server_name in servers:
c = pbs.pbs_connect( pbs_server_name )
if c <= 0:
@@ -386,6 +409,14 @@
# clean up the pbs files
self.cleanup( ( ofile, efile, job_file ) )
+ def fail_oversize_job( self, pbs_job_state ):
+ """
+ Seperated out so we can use the worker threads for it.
+ """
+ self.stop_job( self.app.model.Job.get( pbs_job_state.job_wrapper.job_id ) )
+ pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message )
+ self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) )
+
def cleanup( self, files ):
if not asbool( self.app.config.get( 'debug', False ) ):
for file in files:
@@ -431,7 +462,7 @@
return
pbs.pbs_deljob( c, str( job.job_runner_external_id ), 'NULL' )
pbs.pbs_disconnect( c )
- log.debug( "(%s/%s) Removed from PBS queue at user's request" % ( job.id, job.job_runner_external_id ) )
+ log.debug( "(%s/%s) Removed from PBS queue before job completion" % ( job.id, job.job_runner_external_id ) )
def recover( self, job, job_wrapper ):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
diff -r 0424f713d58d -r e26741c8c642 lib/galaxy/tools/__init__.py
--- a/lib/galaxy/tools/__init__.py Tue Feb 03 14:14:02 2009 -0500
+++ b/lib/galaxy/tools/__init__.py Tue Feb 03 14:15:09 2009 -0500
@@ -1045,7 +1045,7 @@
input.validate( value, None )
input_values[ input.name ] = value
- def build_param_dict( self, incoming, input_datasets, output_datasets, working_directory = None ):
+ def build_param_dict( self, incoming, input_datasets, output_datasets, output_paths ):
"""
Build the dictionary of parameters for substituting into the command
line. Each value is wrapped in a `InputValueWrapper`, which allows
@@ -1098,10 +1098,14 @@
param_dict[ "_CHILD___%s___%s" % ( name, child.designation ) ] = DatasetFilenameWrapper( child )
for name, data in output_datasets.items():
# Write outputs to the working directory (for security purposes) if desired.
- if self.app.config.outputs_to_working_directory and working_directory is not None:
- false_path = os.path.abspath( os.path.join( working_directory, "galaxy_dataset_%d.dat" % data.dataset.id ) )
- param_dict[name] = DatasetFilenameWrapper( data, false_path = false_path )
- open( false_path, 'w' ).close()
+ if self.app.config.outputs_to_working_directory:
+ try:
+ false_path = [ dp.false_path for dp in output_paths if dp.real_path == data.file_name ][0]
+ param_dict[name] = DatasetFilenameWrapper( data, false_path = false_path )
+ open( false_path, 'w' ).close()
+ except IndexError:
+ log.warning( "Unable to determine alternate path for writing job outputs, outputs will be written to their real paths" )
+ param_dict[name] = DatasetFilenameWrapper( data )
else:
param_dict[name] = DatasetFilenameWrapper( data )
# Provide access to a path to store additional files
More information about the galaxy-dev
mailing list