Index: wsrf-cvs/ws-gram/job_monitoring/common/c/source/Makefile.am =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/c/source/Makefile.am,v retrieving revision 1.5.64.2 retrieving revision 1.5.64.2.10.1 diff -u -r1.5.64.2 -r1.5.64.2.10.1 --- wsrf-cvs/ws-gram/job_monitoring/common/c/source/Makefile.am 12 Mar 2007 10:14:46 -0000 1.5.64.2 +++ wsrf-cvs/ws-gram/job_monitoring/common/c/source/Makefile.am 28 Jan 2008 21:15:55 -0000 1.5.64.2.10.1 @@ -28,7 +28,7 @@ libexec_PROGRAMS = globus-scheduler-event-generator -libglobus_scheduler_event_generator___GLOBUS_FLAVOR_NAME__la_SOURCES = globus_scheduler_event_generator_stdout.c globus_scheduler_event_generator.h globus_scheduler_event_generator.c +libglobus_scheduler_event_generator___GLOBUS_FLAVOR_NAME__la_SOURCES = globus_scheduler_event_generator_stdout.c globus_scheduler_event_generator.h globus_scheduler_event_generator.c globus_scheduler_event_generator_stdout.h libglobus_scheduler_event_generator___GLOBUS_FLAVOR_NAME__la_LDFLAGS = $(GPT_LDFLAGS) libglobus_scheduler_event_generator___GLOBUS_FLAVOR_NAME__la_LIBADD = $(GPT_LIB_LINKS) Index: wsrf-cvs/ws-gram/job_monitoring/common/c/source/dirt.sh =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/c/source/dirt.sh,v retrieving revision 1.12.2.5 retrieving revision 1.12.2.5.10.2 diff -u -r1.12.2.5 -r1.12.2.5.10.2 --- wsrf-cvs/ws-gram/job_monitoring/common/c/source/dirt.sh 10 Jun 2007 23:38:42 -0000 1.12.2.5 +++ wsrf-cvs/ws-gram/job_monitoring/common/c/source/dirt.sh 28 Jan 2008 21:15:56 -0000 1.12.2.5.10.2 @@ -1,2 +1,2 @@ -DIRT_TIMESTAMP=1181518722 -DIRT_BRANCH_ID=63 +DIRT_TIMESTAMP=1201554956 +DIRT_BRANCH_ID=0 Index: wsrf-cvs/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator.c =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator.c,v retrieving revision 1.8.2.1 retrieving revision 1.8.2.1.10.1 diff -u -r1.8.2.1 -r1.8.2.1.10.1 --- wsrf-cvs/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator.c 12 Mar 2007 10:14:46 -0000 1.8.2.1 +++ wsrf-cvs/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator.c 28 Jan 2008 21:15:55 -0000 1.8.2.1.10.1 @@ -392,7 +392,9 @@ { return GLOBUS_SEG_ERROR_NULL; } + globus_mutex_lock(&globus_l_seg_mutex); *timestamp = globus_l_seg_timestamp; + globus_mutex_unlock(&globus_l_seg_mutex); return GLOBUS_SUCCESS; } @@ -406,7 +408,7 @@ globus_result_t result = GLOBUS_SUCCESS; globus_mutex_lock(&globus_l_seg_mutex); - if (globus_l_seg_timestamp != 0) + if (globus_l_seg_timestamp > timestamp) { result = GLOBUS_SEG_ERROR_ALREADY_SET; goto error; Index: wsrf-cvs/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator_stdout.c =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator_stdout.c,v retrieving revision 1.4.4.2 retrieving revision 1.4.4.2.10.1 diff -u -r1.4.4.2 -r1.4.4.2.10.1 --- wsrf-cvs/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator_stdout.c 10 Jun 2007 23:38:42 -0000 1.4.4.2 +++ wsrf-cvs/ws-gram/job_monitoring/common/c/source/globus_scheduler_event_generator_stdout.c 28 Jan 2008 21:15:55 -0000 1.4.4.2.10.1 @@ -17,6 +17,11 @@ #include "globus_gram_protocol.h" #include "version.h" +enum +{ + GLOBUS_SEG_INPUT_BUFFER_SIZE = 256 +}; + static globus_result_t globus_l_seg_register_write( @@ -64,7 +69,8 @@ static globus_xio_handle_t globus_l_seg_input_handle; static globus_xio_stack_t globus_l_seg_file_stack; static globus_xio_driver_t globus_l_seg_file_driver; -static char globus_l_seg_input_buffer[1]; +static globus_byte_t * globus_l_seg_input_buffer; +static size_t globus_l_seg_input_buffer_size; static time_t globus_l_seg_timestamp; static globus_fifo_t globus_l_seg_buffers; static globus_bool_t globus_l_seg_write_registered; @@ -283,10 +289,13 @@ goto destroy_mutex_error; } + globus_l_seg_input_buffer_size = GLOBUS_SEG_INPUT_BUFFER_SIZE; + globus_l_seg_input_buffer = malloc(globus_l_seg_input_buffer_size); + result = globus_xio_register_read( globus_l_seg_input_handle, globus_l_seg_input_buffer, - sizeof(globus_l_seg_input_buffer), + globus_l_seg_input_buffer_size, 1, NULL, globus_l_xio_read_eof_callback, @@ -420,7 +429,7 @@ va_end(ap); globus_mutex_lock(&globus_l_seg_mutex); - result = globus_l_seg_register_write(buf); + result = globus_l_seg_register_write((globus_byte_t *) buf); globus_mutex_unlock(&globus_l_seg_mutex); error: @@ -439,18 +448,60 @@ globus_xio_data_descriptor_t data_desc, void * user_arg) { + globus_byte_t * tmp; + ptrdiff_t offset; + unsigned long stamp; + globus_byte_t *next, *prev, *end; + + if (nbytes == len) + { + offset = buffer - globus_l_seg_input_buffer; + + len += globus_l_seg_input_buffer_size; + globus_l_seg_input_buffer_size *= 2; + tmp = realloc(globus_l_seg_input_buffer, + globus_l_seg_input_buffer_size * 2); + globus_assert (tmp != NULL); + globus_l_seg_input_buffer = tmp; + + buffer = globus_l_seg_input_buffer + offset; + } + + end = buffer + nbytes; + *(end+1) = 0; + + prev = globus_l_seg_input_buffer; + next = globus_l_seg_input_buffer; + + while (NULL != (next = strchr((char *) next, '\n'))) + { + *next = '\0'; + if (sscanf((char *) prev, "200 %lu", &stamp) == 1) + { + globus_scheduler_event_generator_set_timestamp((time_t) stamp); + } + next++; + prev = next; + } + + if (prev != globus_l_seg_input_buffer) + { + memmove(globus_l_seg_input_buffer, + prev, + end - prev); + } if (result == GLOBUS_SUCCESS) { - /* shouldn't be reading stuff here!?! */ result = globus_xio_register_read( globus_l_seg_input_handle, - globus_l_seg_input_buffer, - sizeof(globus_l_seg_input_buffer), + globus_l_seg_input_buffer + (end - prev), + globus_l_seg_input_buffer_size - (end - prev), 1, NULL, globus_l_xio_read_eof_callback, NULL); + return; } globus_scheduler_event_generator_fault(result); Index: wsrf-cvs/ws-gram/job_monitoring/common/c/source/pkgdata/pkg_data_src.gpt.in =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/c/source/pkgdata/pkg_data_src.gpt.in,v retrieving revision 1.9.12.3 retrieving revision 1.9.12.3.10.1 diff -u -r1.9.12.3 -r1.9.12.3.10.1 --- wsrf-cvs/ws-gram/job_monitoring/common/c/source/pkgdata/pkg_data_src.gpt.in 10 Jun 2007 23:38:41 -0000 1.9.12.3 +++ wsrf-cvs/ws-gram/job_monitoring/common/c/source/pkgdata/pkg_data_src.gpt.in 28 Jan 2008 21:15:56 -0000 1.9.12.3.10.1 @@ -3,7 +3,7 @@ - + Scheduler Event Generator ResourceManagement Index: wsrf-cvs/ws-gram/job_monitoring/common/java/source/pkgdata/pkg_data_src.gpt =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/java/source/pkgdata/pkg_data_src.gpt,v retrieving revision 1.8.14.3 retrieving revision 1.8.14.3.10.2 diff -u -r1.8.14.3 -r1.8.14.3.10.2 --- wsrf-cvs/ws-gram/job_monitoring/common/java/source/pkgdata/pkg_data_src.gpt 17 Jan 2006 19:35:12 -0000 1.8.14.3 +++ wsrf-cvs/ws-gram/job_monitoring/common/java/source/pkgdata/pkg_data_src.gpt 29 Jan 2008 19:31:50 -0000 1.8.14.3.10.2 @@ -3,7 +3,7 @@ - + GT4 GRAM Job Monitoring Common Classes (Java) Execution Index: wsrf-cvs/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/JobStateMonitor.java =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/JobStateMonitor.java,v retrieving revision 1.10.2.2 retrieving revision 1.10.2.2.10.2 diff -u -r1.10.2.2 -r1.10.2.2.10.2 --- wsrf-cvs/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/JobStateMonitor.java 27 Nov 2006 19:01:53 -0000 1.10.2.2 +++ wsrf-cvs/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/JobStateMonitor.java 29 Jan 2008 19:31:51 -0000 1.10.2.2.10.2 @@ -408,6 +408,7 @@ d = lastEventTimestamp; } + seg.updateRecoveryTimestamp(d); recoveryListener.updateJobMonitorRecoveryTimeStamp(this, d); logger.debug("Exiting updateRecoveryInfo()"); } @@ -501,18 +502,15 @@ String localId = e.getLocalId(); synchronized (mapping) { - ResourceKey mapping = getMapping(localId); + ResourceKey mapping = getMapping(localId); - if (mapping != null) { - logger.debug("Dispatching event " + e.getLocalId() - + " to job " + mapping.getValue()); - dispatchQueue.add(e); - } else { - logger.debug("Caching event " + e.getLocalId()); - cacheEvent(e); + if (mapping != null) { + logger.debug("Dispatching event " + e.getLocalId() + + " to job " + mapping.getValue()); + dispatchQueue.add(e); + } } } - } public JobStateChangeListener getListener () { Index: wsrf-cvs/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/SchedulerEventGenerator.java =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/Attic/SchedulerEventGenerator.java,v retrieving revision 1.8.2.4 retrieving revision 1.8.2.4.10.1 diff -u -r1.8.2.4 -r1.8.2.4.10.1 --- wsrf-cvs/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/SchedulerEventGenerator.java 27 Nov 2006 19:01:53 -0000 1.8.2.4 +++ wsrf-cvs/ws-gram/job_monitoring/common/java/source/src/org/globus/exec/monitoring/SchedulerEventGenerator.java 28 Jan 2008 21:15:56 -0000 1.8.2.4.10.1 @@ -46,6 +46,7 @@ private String schedulerName; /** SEG Process handle */ private Process proc; + private java.io.OutputStreamWriter procInput; /** * Flag indicating that the SEG process should no longer be * restarted and the thread should terminate. @@ -126,6 +127,9 @@ String input; logger.debug("getting seg input"); + + procInput = new java.io.OutputStreamWriter( + proc.getOutputStream(), "UTF-8"); stdout = new java.io.BufferedReader( new java.io.InputStreamReader( proc.getInputStream())); @@ -205,6 +209,22 @@ } /** + * Let the Scheduler Event Generator know that it is safe to store + * old log information to an archive + */ + public void updateRecoveryTimestamp(java.util.Date d) + { + try { + procInput.write( + "200 " + Long.toString((d.getTime() / 1000)) + "\n"); + procInput.flush(); + } catch (java.io.IOException ioe) { + logger.warn(ioe); + } + } + + + /** * Start a scheduler event generator process. * * This function is called to start a new scheduler event generator Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/dirt.sh =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/setup/dirt.sh,v retrieving revision 1.4 retrieving revision 1.4.100.1 diff -u -r1.4 -r1.4.100.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/dirt.sh 29 Nov 2004 19:20:21 -0000 1.4 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/dirt.sh 28 Jan 2008 21:15:54 -0000 1.4.100.1 @@ -1,2 +1,2 @@ -DIRT_TIMESTAMP=1101756021 -DIRT_BRANCH_ID=1 +DIRT_TIMESTAMP=1201554954 +DIRT_BRANCH_ID=0 Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/setup-seg-condor.pl =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/setup/setup-seg-condor.pl,v retrieving revision 1.4 retrieving revision 1.4.126.1 diff -u -r1.4 -r1.4.126.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/setup-seg-condor.pl 29 Nov 2004 19:20:20 -0000 1.4 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/setup-seg-condor.pl 28 Jan 2008 21:15:54 -0000 1.4.126.1 @@ -18,9 +18,12 @@ use Getopt::Long; my $path = $ENV{GLOBUS_LOCATION} . '/var/globus-condor.log'; +my $dir = $ENV{GLOBUS_LOCATION} . '/var/globus-condor'; +my $perjob = 0; my $help = 0; GetOptions('path|p=s' => \$path, + 'dir|d=s' => \$dir, 'help|h' => \$help); &usage if $help; @@ -39,6 +42,29 @@ } chmod(0666, $path); } +if (-e $dir && ! -d $dir) +{ + print STDERR "Log directory $dir exists but is not a directory\n"; + exit 1; +} +elsif (-e $dir) +{ + if (-o $dir) + { + chmod 01777, $dir; + } +} +else +{ + mkdir $dir; + chmod 01777, $dir; +} + +if (((stat($dir))[2] & 01777) != 01777) +{ + print STDERR "Invalid permissons on log dir: $dir\n"; + exit 1; +} my $metadata = new Grid::GPT::Setup(package_name => @@ -50,6 +76,7 @@ open(FP, ">$globusdir/etc/globus-condor.conf"); print FP "log_path=$path\n"; +print FP "log_dir=$dir\n"; close(FP); $metadata->finish(); @@ -58,6 +85,7 @@ { print "Usage: $0 [options]\n". "Options: [--path|-p path to place condor log]\n". + " [--dir|-d dir to use for storing individual condor job logs\n". " [--help|-h]\n". " default path is $ENV{GLOBUS_LOCATION}/var/globus-condor.log\n"; exit 1; Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/pkgdata/pkg_data_src.gpt.in =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/setup/pkgdata/pkg_data_src.gpt.in,v retrieving revision 1.3 retrieving revision 1.3.100.1 diff -u -r1.3 -r1.3.100.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/pkgdata/pkg_data_src.gpt.in 29 Nov 2004 19:20:21 -0000 1.3 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/setup/pkgdata/pkg_data_src.gpt.in 28 Jan 2008 21:15:54 -0000 1.3.100.1 @@ -3,7 +3,7 @@ - + Scheduler Event Generator Condor Setup ResourceManagement Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/source/dirt.sh =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/source/dirt.sh,v retrieving revision 1.9.2.9 retrieving revision 1.9.2.9.10.1 diff -u -r1.9.2.9 -r1.9.2.9.10.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/source/dirt.sh 31 Oct 2006 14:02:23 -0000 1.9.2.9 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/source/dirt.sh 28 Jan 2008 21:15:55 -0000 1.9.2.9.10.1 @@ -1,2 +1,2 @@ -DIRT_TIMESTAMP=1162303343 -DIRT_BRANCH_ID=63 +DIRT_TIMESTAMP=1201554955 +DIRT_BRANCH_ID=0 Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/source/seg_condor_module.c =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/source/seg_condor_module.c,v retrieving revision 1.4.2.4 retrieving revision 1.4.2.4.10.1 diff -u -r1.4.2.4 -r1.4.2.4.10.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/source/seg_condor_module.c 31 Oct 2006 14:02:22 -0000 1.4.2.4 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/source/seg_condor_module.c 28 Jan 2008 21:15:55 -0000 1.4.2.4.10.1 @@ -9,13 +9,15 @@ * modifications, you must include this notice in the file. */ - #include "globus_common.h" #include "globus_scheduler_event_generator.h" +#include "globus_scheduler_event_generator_app.h" #include "version.h" +#include "libxml/xmlreader.h" #include #include +#include #define SEGCondorEnter() \ SEGCondorDebug(SEG_CONDOR_DEBUG_INFO, ("Enter %s\n", _globus_func_name)) @@ -66,7 +68,7 @@ GlobusDebugPrintf(SEG_CONDOR, level, message) #else #define SEGCondorDebug(level, message) \ - if (level == SEG_CONDOR_DEBUG_ERROR) \ + if (1 /*level == SEG_CONDOR_DEBUG_ERROR*/) \ { \ fprintf(stderr, "%s", globus_l_seg_condor_level_string(level)); \ globus_l_seg_condor_debug message; \ @@ -102,84 +104,71 @@ } } -/** - * State of the CONDOR log file parser. - */ -typedef struct -{ - /** Path of the current log file being parsed */ - char * path; - /** Timestamp of when to start generating events from */ - time_t start_timestamp; - /** Stdio file handle of the log file */ - FILE * fp; - /** Buffer of log file data */ - char * buffer; - /** Length of the buffer */ - size_t buffer_length; - /** Starting offset of valid data in the buffer. */ - size_t buffer_point; - /** Amount of valid data in the buffer */ - size_t buffer_valid; - /** - * Flag indicating a Log close event indicating that the current - * log was found in the log - */ - globus_bool_t end_of_log; - /** - * Flag inidicating that this logfile isn't the one corresponding to - * today, so and EOF on it should require us to close and open a newer - * one - */ - globus_bool_t old_log; -} globus_l_condor_logfile_state_t; +static +int +globus_l_condor_process_file( + const char * full_path_name, + globus_bool_t flush_old_files, + time_t start_time, + time_t sweep_time, + time_t flush_timestamp, + globus_list_t ** event_list); static int -globus_l_condor_parse_event( +globus_l_condor_parse_events( char * buffer, - time_t start_timestamp); - -static globus_mutex_t globus_l_condor_mutex; -static globus_cond_t globus_l_condor_cond; -static globus_bool_t shutdown_called; -static int callback_count; - - -GlobusDebugDefine(SEG_CONDOR); + size_t size, + time_t start_time, + time_t sweep_time, + globus_list_t ** event_list, + globus_bool_t * terminated); static int -globus_l_condor_module_activate(void); +globus_l_condor_parse_event( + char * buffer, + size_t len, + globus_scheduler_event_t * evt); static int -globus_l_condor_module_deactivate(void); +globus_l_event_sort(void *lo, void *hi, void *ignore); static void -globus_l_condor_read_callback( - void * user_arg); +globus_l_condor_seg_emit_events(globus_list_t *event_list); static -int -globus_l_condor_parse_events( - globus_l_condor_logfile_state_t * state); +void +globus_l_condor_event_free(void * e); +/** Timestamp of when to start generating events from */ +static time_t globus_l_condor_start_timestamp; +/** Timestamp of files we can safely remove (if the jobs are terminated) */ +static time_t globus_l_condor_flush_timestamp; +/** Lock around shutdown data */ +static globus_mutex_t globus_l_condor_mutex; +/** Shutdown condition */ +static globus_cond_t globus_l_condor_cond; +/** Shutdown value */ +static globus_bool_t globus_l_condor_shutdown_called; +/** Handle of file polling periodic event */ +static globus_callback_handle_t globus_l_condor_callback_handle; + +GlobusDebugDefine(SEG_CONDOR); static int -globus_l_condor_clean_buffer( - globus_l_condor_logfile_state_t * state); +globus_l_condor_module_activate(void); static int -globus_l_condor_increase_buffer( - globus_l_condor_logfile_state_t * state); +globus_l_condor_module_deactivate(void); static -int -globus_l_condor_find_logfile( - globus_l_condor_logfile_state_t * state); +void +globus_l_condor_poll_callback( + void * user_arg); globus_module_descriptor_t globus_scheduler_event_module_ptr = @@ -197,10 +186,11 @@ int globus_l_condor_module_activate(void) { - globus_l_condor_logfile_state_t * logfile_state; int rc; globus_reltime_t delay; + globus_reltime_t period; globus_result_t result; + char * path; GlobusFuncName(globus_l_condor_module_activate); rc = globus_module_activate(GLOBUS_COMMON_MODULE); @@ -231,86 +221,56 @@ ("Fatal error initializing cond\n")); goto destroy_mutex_error; } - shutdown_called = GLOBUS_FALSE; - callback_count = 0; - - logfile_state = globus_libc_calloc( - 1, - sizeof(globus_l_condor_logfile_state_t)); - - if (logfile_state == NULL) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, - ("Fatal error: out of memory\n")); - goto destroy_cond_error; - } - - rc = globus_l_condor_increase_buffer(logfile_state); - if (rc != GLOBUS_SUCCESS) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, - ("Fatal error: out of memory\n")); - goto free_logfile_state_error; - } + globus_l_condor_shutdown_called = GLOBUS_FALSE; /* Configuration info */ result = globus_scheduler_event_generator_get_timestamp( - &logfile_state->start_timestamp); + &globus_l_condor_start_timestamp); + globus_l_condor_flush_timestamp = globus_l_condor_start_timestamp; if (result != GLOBUS_SUCCESS) { fprintf(stderr, "Fatal error \n"); SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, ("Fatal error (unable to parse timestamp)\n")); - goto free_logfile_state_buffer_error; + goto destroy_cond_error; } - /* Locate logfile */ - rc = globus_l_condor_find_logfile(logfile_state); - - if (rc == GLOBUS_SUCCESS) - { - logfile_state->fp = fopen(logfile_state->path, "r"); - - if (logfile_state->fp == NULL) - { - rc = SEG_CONDOR_ERROR_OUT_OF_MEMORY; - - SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, - ("Fatal error (open %s): %s\n", - logfile_state->path, - strerror(errno))); - - goto free_logfile_state_path_error; - } - } - else - { - goto free_logfile_state_path_error; + globus_common_get_attribute_from_config_file( + NULL, + "etc/globus-condor.conf", + "log_dir", + &path); + + if (path == NULL) + { + globus_common_get_attribute_from_config_file( + NULL, + "etc/globus-condor.conf", + "log_path", + &path); } + GlobusTimeReltimeSet(delay, 0, 0); + GlobusTimeReltimeSet(period, 2, 0); - result = globus_callback_register_oneshot( - NULL, + result = globus_callback_register_periodic( + &globus_l_condor_callback_handle, &delay, - globus_l_condor_read_callback, - logfile_state); + &period, + globus_l_condor_poll_callback, + path); if (result != GLOBUS_SUCCESS) { - goto free_logfile_state_path_error; + goto free_path_error; } - callback_count++; SEGCondorExit(); return 0; -free_logfile_state_path_error: - globus_libc_free(logfile_state->path); -free_logfile_state_buffer_error: - globus_libc_free(logfile_state->buffer); -free_logfile_state_error: - globus_libc_free(logfile_state); +free_path_error: + free(path); destroy_cond_error: globus_cond_destroy(&globus_l_condor_cond); destroy_mutex_error: @@ -327,14 +287,27 @@ int globus_l_condor_module_deactivate(void) { + globus_result_t result; + globus_bool_t active; GlobusFuncName(globus_l_condor_module_deactivate); SEGCondorEnter(); globus_mutex_lock(&globus_l_condor_mutex); - shutdown_called = GLOBUS_TRUE; + globus_l_condor_shutdown_called = GLOBUS_TRUE; + + result = globus_callback_unregister( + globus_l_condor_callback_handle, + NULL, + NULL, + &active); + + if (result == GLOBUS_SUCCESS && active == GLOBUS_FALSE) + { + globus_l_condor_callback_handle = GLOBUS_HANDLE_TABLE_NO_HANDLE; + } - while (callback_count > 0) + while (globus_l_condor_callback_handle) { globus_cond_wait(&globus_l_condor_cond, &globus_l_condor_mutex); } @@ -350,35 +323,52 @@ /* globus_l_condor_module_deactivate() */ /** - * read_cb: - * parse_events(buffer) + * condor log polling algorithm + * + * The goal of this is to periodically emit new SEG events for everything which + * has occurred since the last sweep. The code assumes only one GRAM job is + * present in each log file * - * if (!eof) // do i need to check stat state or will this behave well w/local - * // files? - * register read (read_cb) - * else - * if (it's an old logfile) - * register_close(old_close_cb) - * else - * register wakeup (wakeup_cb) + * Check multiple files and emit events for any that have occurred since + * the last sweep: + * sweep_time := time(NULL); + * flush_time := globus_scheduler_event_generator_get_timestamp() + * + * for each file in condor log dir: + * lstat each file. + * if owner != name or !regular file + * continue + * + * if modified between state->start_timestamp and sweep_time, + * open file + * lock + * parse all events between start_timestamp and sweep_time + * if (#submits = #terminated + #failed) + * remove file + * close file */ static void -globus_l_condor_read_callback( +globus_l_condor_poll_callback( void * user_arg) { + char * path = user_arg; int rc; - globus_l_condor_logfile_state_t * state = user_arg; - size_t max_to_read; - globus_bool_t eof_hit = GLOBUS_FALSE; - globus_reltime_t delay; - globus_result_t result; - GlobusFuncName(globus_l_condor_read_callback); + time_t sweep_time = time(NULL); + time_t flush_timestamp; + globus_bool_t flush_old_files = GLOBUS_FALSE; + globus_list_t * event_list = NULL; + struct stat st; + DIR * dir; + struct dirent * entry; + char * full_path_name; + globus_list_t * l; + GlobusFuncName(globus_l_condor_poll_callback); SEGCondorEnter(); globus_mutex_lock(&globus_l_condor_mutex); - if (shutdown_called) + if (globus_l_condor_shutdown_called) { SEGCondorDebug(SEG_CONDOR_DEBUG_INFO, ("Polling while deactivating\n")); @@ -387,749 +377,569 @@ } globus_mutex_unlock(&globus_l_condor_mutex); - if (state->fp != NULL) - { - /* Read data --- leave an extra byte space so we can null-terminate - * and use strstr() - */ - max_to_read = state->buffer_length - state->buffer_valid - - state->buffer_point - 1; - - SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, - ("reading a maximum of %u bytes\n", max_to_read)); - - rc = fread(state->buffer + state->buffer_point + state->buffer_valid, - 1, max_to_read, state->fp); - - SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, - ("read %d bytes\n", rc)); + globus_scheduler_event_generator_get_timestamp(&flush_timestamp); - if (rc < max_to_read) - { - if (feof(state->fp)) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, ("EOF read\n")); - eof_hit = GLOBUS_TRUE; - clearerr(state->fp); - } - else - { - /* XXX: Read error */ - } - } + if (flush_timestamp > globus_l_condor_flush_timestamp) + { + flush_old_files = GLOBUS_TRUE; + } + else + { + flush_timestamp = globus_l_condor_flush_timestamp; + } - state->buffer_valid += rc; + rc = stat(path, &st); - /* Parse data */ - rc = globus_l_condor_parse_events(state); + if (rc < 0) + { + return; + } - rc = globus_l_condor_clean_buffer(state); + if (S_ISREG(st.st_mode)) + { + globus_l_condor_process_file( + path, + GLOBUS_FALSE, + globus_l_condor_start_timestamp, + sweep_time, + 0, + &event_list); + } + else if (S_ISDIR(st.st_mode)) + { + dir = opendir(path); - if (eof_hit) + while ((entry = readdir(dir)) != NULL) { - GlobusTimeReltimeSet(delay, 2, 0); - } - else - { - /* still data available in current file, hurry up! */ - GlobusTimeReltimeSet(delay, 0, 0); + full_path_name = globus_common_create_string( + "%s/%s", path, entry->d_name); + + globus_l_condor_process_file( + full_path_name, + flush_old_files, + globus_l_condor_start_timestamp, + sweep_time, + flush_timestamp, + &event_list); + + free(full_path_name); } + closedir(dir); } else { - rc = globus_l_condor_find_logfile(state); - if(rc == SEG_CONDOR_ERROR_LOG_NOT_PRESENT) - { - GlobusTimeReltimeSet(delay, 60, 0); - } - else - { - goto error; - } + SEGCondorDebug( + SEG_CONDOR_DEBUG_WARN, + ("Unexpected type for log file: %d\n", st.st_mode & S_IFMT)); } - result = globus_callback_register_oneshot( - NULL, - &delay, - globus_l_condor_read_callback, - state); - if (result != GLOBUS_SUCCESS) - { - goto error; - } + l = globus_list_sort(event_list, globus_l_event_sort, NULL); + globus_list_free(event_list); + event_list = l; + globus_l_condor_seg_emit_events(event_list); + globus_list_destroy_all(event_list, globus_l_condor_event_free); + + globus_l_condor_start_timestamp = sweep_time; + globus_l_condor_flush_timestamp = flush_timestamp; + SEGCondorExit(); return; + error: + free(path); globus_mutex_lock(&globus_l_condor_mutex); - if (shutdown_called) + if (globus_l_condor_shutdown_called) { - callback_count--; - - if (callback_count == 0) - { - globus_cond_signal(&globus_l_condor_cond); - } + globus_l_condor_callback_handle = 0; + globus_cond_signal(&globus_l_condor_cond); } globus_mutex_unlock(&globus_l_condor_mutex); SEGCondorExit(); return; } -/* globus_l_condor_read_callback() */ +/* globus_l_condor_poll_callback() */ -/** - * Determine the next available CONDOR log file name from the - * timestamp stored in the logfile state structure. - * - * @param state - * CONDOR log state structure. The path field of the structure may be - * modified by this function. - * - * @retval GLOBUS_SUCCESS - * Name of an log file name has been found and the file exists. - * @retval 1 - * Something bad occurred. - */ static int -globus_l_condor_find_logfile( - globus_l_condor_logfile_state_t * state) +globus_l_condor_process_file( + const char * full_path_name, + globus_bool_t flush_old_files, + time_t start_time, + time_t sweep_time, + time_t flush_timestamp, + globus_list_t ** event_list) { - struct stat s; + char * buffer; + struct stat statbuf[2]; int rc; - int save_errno = 0; - GlobusFuncName(globus_l_condor_find_logfile); + int fd; + struct flock fl; + globus_bool_t terminated; - SEGCondorEnter(); + rc = lstat(full_path_name, &statbuf[0]); + if (rc < 0) + { + goto error; + } - if (state->path == NULL) + if (!S_ISREG(statbuf[0].st_mode)) { - SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, ("allocating path\n")); + goto error; + } - globus_common_get_attribute_from_config_file(NULL, - "etc/globus-condor.conf", "log_path", &state->path); + if (statbuf[0].st_mtime >= start_time || flush_old_files) + { + fd = open(full_path_name, O_RDONLY); - if (state->path == NULL) + if (fd < 0) { - rc = SEG_CONDOR_ERROR_OUT_OF_MEMORY; goto error; } - } + rc = fstat(fd, &statbuf[1]); - do - { - rc = stat(state->path, &s); - - if (rc < 0) + if (rc != 0) { - save_errno = errno; - switch (errno) - { - case ENOENT: - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("missing log file\n")); - rc = SEG_CONDOR_ERROR_LOG_NOT_PRESENT; - goto error; - - case EACCES: - /* Permission problem (fatal) */ - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("permissions needed to access logfile %s\n", - state->path)); - rc = SEG_CONDOR_ERROR_LOG_PERMISSIONS; - goto error; - - case ENOTDIR: - case ELOOP: - case ENAMETOOLONG: - /* broken path (fatal) */ - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("broken path to logfile %s\n", - state->path)); - rc = SEG_CONDOR_ERROR_BAD_PATH; - goto error; - - case EFAULT: - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("bad pointer\n")); - globus_assert(errno != EFAULT); - - case EINTR: - case ENOMEM: /* low kernel mem */ - /* try again later */ - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("going to have to retry stat()\n")); - continue; - - default: - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("unexpected errno\n")); - rc = SEG_CONDOR_ERROR_UNKNOWN; - goto error; - } + goto open_file_error; } - } - while (rc != 0); - - if (rc != 0) - { - goto error; - } - - SEGCondorExit(); - return 0; -error: - if (state->path == NULL) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, - ("Error retrieving log_path attribute from " - "$GLOBUS_LOCATION/etc/globus-condor.conf\n")); - } - else - { - errno = save_errno; - SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, - ("Error reading logfile %s: %s\n", - state->path, - strerror(save_errno))); - - } - SEGCondorExit(); - return rc; -} -/* globus_l_condor_find_logfile() */ - -/** - * Move any data in the state buffer to the beginning, to enable reusing - * buffer space which has already been parsed. - */ -static -int -globus_l_condor_clean_buffer( - globus_l_condor_logfile_state_t * state) -{ - int rc = 0; - GlobusFuncName(globus_l_condor_clean_buffer); + /* Somebody's playing games */ + if (statbuf[0].st_ino != statbuf[1].st_ino) + { + goto open_file_error; + } - SEGCondorEnter(); + /* Get read lock on the whole file */ + fl.l_start = 0; + fl.l_len = 0; + fl.l_type = F_RDLCK; + fl.l_whence = SEEK_SET; - /* move data to head of buffer */ - if (state->buffer != NULL) - { - if(state->buffer_point > 0) + do { - if (state->buffer_valid > 0) + rc = fcntl(fd, F_SETLKW, &fl); + + if (rc == -1 && errno != EINTR) { - memmove(state->buffer, - state->buffer+state->buffer_point, - state->buffer_valid); + goto open_file_error; } - state->buffer_point = 0; + } while (rc == -1); + + if (statbuf[1].st_size == 0) + { + goto open_file_error; } - rc = globus_l_condor_increase_buffer(state); - } - SEGCondorExit(); - return rc; -} -/* globus_l_condor_clean_buffer() */ -/** - * Reduce unused space in the log buffer, increasing the size of the buffer - * if it is full. - * - * @param state - * CONDOR log state structure. The buffer-related fields of the structure - * may be modified by this function. - */ -static -int -globus_l_condor_increase_buffer( - globus_l_condor_logfile_state_t * state) -{ - char * save = state->buffer; - const size_t GLOBUS_CONDOR_READ_BUFFER_SIZE = 4096; - int rc; - GlobusFuncName(globus_l_condor_increase_buffer); + buffer = mmap(NULL, (size_t) statbuf[1].st_size, PROT_READ, MAP_SHARED, fd, 0); + globus_assert(buffer != NULL); - SEGCondorEnter(); - /* If the buffer is full, resize */ - if (state->buffer_valid == state->buffer_length) - { - state->buffer = globus_libc_realloc(state->buffer, - state->buffer_length + GLOBUS_CONDOR_READ_BUFFER_SIZE); - if (state->buffer == NULL) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, ("realloc() failed\n")); + globus_l_condor_parse_events( + buffer, statbuf[1].st_size, start_time, + sweep_time, event_list, &terminated); - rc = SEG_CONDOR_ERROR_OUT_OF_MEMORY; - goto error; + if ((statbuf[0].st_mtime < flush_timestamp) && + terminated && flush_old_files) + { + SEGCondorDebug(SEG_CONDOR_DEBUG_INFO, + ("Removing old file %s\n", full_path_name)); + remove(full_path_name); } - state->buffer_length += GLOBUS_CONDOR_READ_BUFFER_SIZE; + munmap(buffer, (size_t) statbuf[1].st_size); +open_file_error: + close(fd); } - SEGCondorExit(); - return 0; - error: - SEGCondorExit(); - state->buffer = save; return rc; } -/* globus_l_condor_increase_buffer() */ +/* globus_l_condor_process_file() */ +/* + * For each event in buffer + * keep track of # of submit events + * keep track of # of terminated or aborted events + * + * if event is newer or equal to oldest_time and before newest_time + * add to list + * + * If # of submit == # of terminated or aborted set terminated to true + */ static int globus_l_condor_parse_events( - globus_l_condor_logfile_state_t * state) + char * buffer, + size_t size, + time_t oldest_time, + time_t newest_time, + globus_list_t ** list, + globus_bool_t * terminated) { char * eot; int rc; char * p; + globus_scheduler_event_t * evt = NULL; + int procs_started = 0; + int procs_ended = 0; + ptrdiff_t len; GlobusFuncName(globus_l_condor_parse_events); SEGCondorEnter(); - state->buffer[state->buffer_point + state->buffer_valid] = '\0'; - - p = state->buffer + state->buffer_point; + p = buffer; - while (isspace(*p)) - { - p++; - } + *terminated = GLOBUS_FALSE; + while ((eot = strstr(p, "\n")) != NULL) { - *(eot+4) = '\0'; + len = eot+4 - p; - if (strncmp(p, "", 3) == 0) + if ((p = strstr(p, "")) != NULL) { - p += 3; - - rc = globus_l_condor_parse_event(p, state->start_timestamp); - } + evt = calloc(1, sizeof(globus_scheduler_event_t)); - state->buffer_valid -= eot + 4 - state->buffer - state->buffer_point; - state->buffer_point = eot + 4 - state->buffer; + rc = globus_l_condor_parse_event(p, len, evt); + if (rc < 0) + { + SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, ("failed parsing event\n", evt->job_id, evt->event_type)); + goto next; + } - if (state->buffer_valid > 0) + if (evt->event_type == GLOBUS_SCHEDULER_EVENT_PENDING) + { + procs_started++; + } + else if (evt->event_type == GLOBUS_SCHEDULER_EVENT_FAILED || + evt->event_type == GLOBUS_SCHEDULER_EVENT_DONE) + { + procs_ended++; + } + if (evt->timestamp >= oldest_time && + evt->timestamp < newest_time) + { + SEGCondorDebug( + SEG_CONDOR_DEBUG_TRACE, + ("keeping event: %s %d\n", evt->job_id, evt->event_type)); + globus_list_insert(list, evt); + evt = NULL; + } + else + { + SEGCondorDebug( + SEG_CONDOR_DEBUG_TRACE, + ("dropping event: %s %d time range is %ld to %ld, this is %ld\n", evt->job_id, evt->event_type, oldest_time, newest_time, evt->timestamp)); + + free(evt->job_id); + free(evt); + evt = NULL; + } + } + else { - state->buffer_valid--; - state->buffer_point++; + goto out; } +next: + p = eot+5; + } - p = state->buffer + state->buffer_point; + if (procs_started > 0 && procs_started == procs_ended) + { + *terminated = GLOBUS_TRUE; } +out: SEGCondorExit(); return 0; } /* globus_l_condor_parse_events() */ - static int globus_l_condor_parse_event( char * buffer, - time_t start_timestamp) + size_t len, + globus_scheduler_event_t * evt) { - char * p; - char * attr; - char * tmp; - int event_type_number; - char * event_time; int cluster; int proc; int subproc; - globus_bool_t terminated_normally; - int return_value = 0; - struct tm event_tm; - time_t event_stamp; - char * jobid; - int jobid_len; - int len; - globus_result_t result; + struct tm tm; + int rc = 0; + xmlTextReaderPtr reader; + const xmlChar *condor_attr = NULL, *type, *tmp; + const xmlChar *EventTypeNumber, *EventTime, + *Cluster, *Proc, *Subproc, + *TerminatedNormally, *ReturnValue; + char fmt[] = "parsing event %.9223372036854775807s\n"; - enum condor_attr_e - { - DONTCARE, - EVENT_TYPE_NUMBER, - EVENT_TIME, - CLUSTER, - PROC, - SUBPROC, - TERMINATED_NORMALLY, - RETURN_VALUE - } condor_attr; - typedef enum - { - CONDOR_STRING, - CONDOR_INTEGER, - CONDOR_BOOLEAN, - CONDOR_REAL - } condor_parse_type_t; - union - { - condor_parse_type_t type; - - struct - { - condor_parse_type_t type; - char * s; - } s; - - struct - { - condor_parse_type_t type; - int i; - } i; - - struct - { - condor_parse_type_t type; - globus_bool_t b; - } b; - - struct - { - condor_parse_type_t type; - float r; - } r; - } pu; GlobusFuncName(globus_l_condor_parse_event); - SEGCondorEnter(); +#define SEG_CONDOR_ASSERT_TYPE(t) \ + if (type[0] != #t[0] && type[1] != 0) \ + { \ + SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, \ + ("Parse error: unexpected type %s for %s\n", \ + type, condor_attr)); \ + rc = -1 ; \ + goto free_reader_out; \ + } + + sprintf(fmt, "parsing event %%.%zus\n", len); SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, - ("parsing event %s\n", buffer)); - p = buffer; + (fmt, buffer)); - while(isspace(*p)) + reader = xmlReaderForMemory(buffer, len, NULL, NULL, 0); + if (reader == NULL) { - p++; + return -1; } - while ((strncmp(p, " */ + if (xmlTextReaderRead(reader) != 1) { - p += 6; /* [[ elements */ + while (xmlTextReaderRead(reader) == 1) + { + /* Read element, ignoring whitespace and */ + while (xmlTextReaderNodeType(reader) != XML_READER_TYPE_ELEMENT) { - p++; + if (xmlTextReaderRead(reader) != 1) + { + rc = 0; + goto free_reader_out; + } } - if (!*p) + + type = xmlTextReaderConstLocalName(reader); + if (strcmp((char *) type, "a") != 0) { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("short buffer")); - return 1; + rc = -1; + + goto free_reader_out; } - *(p++) = '\0'; - if (*p == '\0') + + /* Determine n attribute's value */ + if (xmlTextReaderMoveToAttribute(reader, BAD_CAST "n") != 1) { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("short buffer")); - return 1; + rc = -1; + goto free_reader_out; } - p++; - if (strcmp(attr, "EventTypeNumber") == 0) + condor_attr = xmlTextReaderConstString( + reader, xmlTextReaderConstValue(reader)); + + /* Move to type node (i, s, r, b) */ + do { - condor_attr = EVENT_TYPE_NUMBER; - } - else if (strcmp(attr, "EventTime") == 0) + if (xmlTextReaderRead(reader) != 1) + { + rc = 0; + goto free_reader_out; + } + } while (xmlTextReaderNodeType(reader) != XML_READER_TYPE_ELEMENT); + + type = xmlTextReaderConstLocalName(reader); + if (strcmp((char *) type, "i") == 0 || + strcmp((char *) type, "r") == 0 || + strcmp((char *) type, "s") == 0) { - condor_attr = EVENT_TIME; + do + { + if (xmlTextReaderRead(reader) != 1) + { + rc = 0; + goto free_reader_out; + } + } while (xmlTextReaderNodeType(reader) != XML_READER_TYPE_TEXT); + + tmp = xmlTextReaderConstValue(reader); } - else if (strcmp(attr, "Cluster") == 0) + else if (strcmp((char *) type, "b") == 0) { - condor_attr = CLUSTER; + if (xmlTextReaderMoveToAttribute(reader, BAD_CAST "v") != 1) + { + rc = -1; + goto free_reader_out; + } + tmp = xmlTextReaderConstValue(reader); } - else if (strcmp(attr, "Proc") == 0) + + if (condor_attr == EventTypeNumber) { - condor_attr = PROC; + SEG_CONDOR_ASSERT_TYPE(i); + + switch (atoi((char *) tmp)) + { + case 0: /* SubmitEvent */ + evt->event_type = GLOBUS_SCHEDULER_EVENT_PENDING; + break; + + case 1: /* ExecuteEvent */ + evt->event_type = GLOBUS_SCHEDULER_EVENT_ACTIVE; + break; + + case 5: /* JobTerminatedEvent */ + evt->event_type = GLOBUS_SCHEDULER_EVENT_DONE; + break; + case 9: /* JobAbortedEvent */ + evt->event_type = GLOBUS_SCHEDULER_EVENT_FAILED; + break; + default: + /* Doesn't map directly to SEG event type */ + rc = -1; + goto free_reader_out; + } } - else if (strcmp(attr, "Subproc") == 0) + else if (condor_attr == EventTime) { - condor_attr = SUBPROC; + SEG_CONDOR_ASSERT_TYPE(s); + memset(&tm, 0, sizeof(struct tm)); + /* 2008-01-23T12:13:20 */ + strptime((char *) tmp, "%Y-%m-%dT%H:%M:%S", &tm); + evt->timestamp = mktime(&tm); } - else if (strcmp(attr, "TerminatedNormally") == 0) + else if (condor_attr == Cluster) { - condor_attr = TERMINATED_NORMALLY; + SEG_CONDOR_ASSERT_TYPE(i); + cluster = atoi((char *) tmp); } - else if (strcmp(attr, "ReturnValue") == 0) + else if (condor_attr == Proc) { - condor_attr = RETURN_VALUE; + SEG_CONDOR_ASSERT_TYPE(i); + proc = atoi((char *) tmp); } - else + else if (condor_attr == Subproc) { - condor_attr = DONTCARE; + SEG_CONDOR_ASSERT_TYPE(i); + subproc = atoi((char *) tmp); } - if (strncmp(p, "", 3) == 0) + else if (condor_attr == TerminatedNormally) { - /* String value */ - p += 3; - - if (*p == '\0') - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("short buffer")); - return 1; - } - pu.type = CONDOR_STRING; - pu.s.s = p; - p = strstr(p, ""); - *p = '\0'; - p += 4; - - } - else if (strncmp(p, "", 3) == 0) - { - /* Integer value */ - p += 3; - if (*p == '\0') - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("short buffer")); - return 1; - } - pu.type = CONDOR_INTEGER; - tmp = p; - p = strstr(p, ""); - *p = '\0'; - p += 4; - pu.i.i = atoi(tmp); - } - else if (strncmp(p, "", 10) == 0) - { - /* Boolean true value */ - p += 10; - pu.type = CONDOR_BOOLEAN; - pu.b.b = GLOBUS_TRUE; - } - else if (strncmp(p, "", 10) == 0) - { - /* Boolean false value */ - p += 10; - pu.type = CONDOR_BOOLEAN; - pu.b.b = GLOBUS_FALSE; - } - else if (strncmp(p, "", 3) == 0) - { - /* Real value */ - p += 3; - if (*p == '\0') + SEG_CONDOR_ASSERT_TYPE(b); + if (strcmp((char *) tmp, "t") == 0) { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("short buffer")); - return 1; + evt->event_type = GLOBUS_SCHEDULER_EVENT_DONE; } - pu.type = CONDOR_REAL; - tmp = p; - sscanf(p, "%f%n", &pu.r.r, &len); - p += len; - if (*p == '\0') - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("short buffer")); - return 1; - } - if (strncmp(p, "", 4) != 0) + else { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("expected , got %s\n", p)); - return 1; + evt->event_type = GLOBUS_SCHEDULER_EVENT_FAILED; } - *p = '\0'; - p += 4; + } + else if (condor_attr == ReturnValue) + { + SEG_CONDOR_ASSERT_TYPE(i); + evt->exit_code = atoi((char *) tmp); + evt->failure_code = atoi((char *) tmp); } else { - SEGCondorDebug(SEG_CONDOR_DEBUG_ERROR, - ("unknown token at %s\n", p)); - break; + SEGCondorDebug(SEG_CONDOR_DEBUG_INFO, + ("Unexpected condor_attr: %s\n", + condor_attr)); } + } - switch (condor_attr) - { - case EVENT_TYPE_NUMBER: - if (pu.type != CONDOR_INTEGER) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("EventTypeNumber: expected int, got %d\n", pu.type)); - break; +free_reader_out: + if (rc == 0) + { + evt->job_id = globus_common_create_string( + "%03d.%03d.%03d", cluster, proc, subproc); + } + xmlFreeTextReader(reader); + return rc; +} +/* globus_l_condor_parse_event() */ - } - event_type_number = pu.i.i; - break; - case EVENT_TIME: - if (pu.type != CONDOR_STRING) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("EventTime: expected string, got %d\n", pu.type)); - break; - } - event_time = pu.s.s; +static +int +globus_l_event_sort(void *lo, void *hi, void *ignore) +{ + globus_scheduler_event_t *evl = lo, *evh = hi; - sscanf(event_time, "%04d-%02d-%02dT%2d:%2d:%2d", - &event_tm.tm_year, - &event_tm.tm_mon, - &event_tm.tm_mday, - &event_tm.tm_hour, - &event_tm.tm_min, - &event_tm.tm_sec); - - event_tm.tm_year -= 1900; - event_tm.tm_mon -= 1; - event_tm.tm_isdst = -1; + return (evl->timestamp < evh->timestamp); +} +/* globus_l_event_sort() */ - event_stamp = mktime(&event_tm); +static +void +globus_l_condor_seg_emit_events(globus_list_t *event_list) +{ + globus_scheduler_event_t * evt; - break; - case CLUSTER: - if (pu.type != CONDOR_INTEGER) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("Cluster: expected int, got %d\n", pu.type)); - break; + while (!globus_list_empty(event_list)) + { + evt = globus_list_first(event_list); + event_list = globus_list_rest(event_list); - } - cluster = pu.i.i; + switch (evt->event_type) + { + case GLOBUS_SCHEDULER_EVENT_PENDING: + globus_scheduler_event_pending(evt->timestamp, evt->job_id); break; - case PROC: - if (pu.type != CONDOR_INTEGER) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("Proc: expected int, got %d\n", pu.type)); - break; - } - proc = pu.i.i; + case GLOBUS_SCHEDULER_EVENT_ACTIVE: + globus_scheduler_event_active(evt->timestamp, evt->job_id); break; - case SUBPROC: - if (pu.type != CONDOR_INTEGER) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("Subproc: expected int, got %d\n", pu.type)); - break; - } - subproc = pu.i.i; + case GLOBUS_SCHEDULER_EVENT_DONE: + globus_scheduler_event_done(evt->timestamp, evt->job_id, + evt->exit_code); break; - case TERMINATED_NORMALLY: - if (pu.type != CONDOR_BOOLEAN) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("TerminatedNormally: expected bool, got %d\n", pu.type)); - break; - } - terminated_normally = pu.b.b; + case GLOBUS_SCHEDULER_EVENT_FAILED: + globus_scheduler_event_failed(evt->timestamp, evt->job_id, + evt->failure_code); break; - case RETURN_VALUE: - if (pu.type != CONDOR_INTEGER) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("ReturnValue: expected int, got %d\n", pu.type)); - break; - - } - return_value = pu.i.i; - break; - case DONTCARE: default: - SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, - ("Ignoring attribute %s\n", attr)); break; } - if (strncmp(p, "", 4) != 0) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("missing at %s\n", p)); - return 1; - } - p += 4; - - while (isspace(*p)) - { - p++; - } - } - - if (event_stamp < start_timestamp) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_TRACE, - ("ignoring old event type %d for job %03d.%03d.%03d\n", - event_type_number, cluster, proc, subproc)); - - return 0; } +} +/* globus_l_condor_seg_emit_events() */ - jobid_len = globus_libc_printf_length( - "%03d.%03d.%03d", cluster, proc, subproc); - - jobid = malloc(jobid_len+1); - sprintf(jobid, "%03d.%03d.%03d", cluster, proc, subproc); +static +void +globus_l_condor_event_free(void * e) +{ + globus_scheduler_event_t * event = e; - switch (event_type_number) + if (event) { - case 0: /* SubmitEvent */ - result = globus_scheduler_event_pending(event_stamp, jobid); - - if (result != GLOBUS_SUCCESS) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("Unable to send pending event: %s\n", - globus_object_printable_to_string( - globus_error_peek(result)))); - } - break; - case 1: /* ExecuteEvent */ - result = globus_scheduler_event_active(event_stamp, jobid); - - if (result != GLOBUS_SUCCESS) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("Unable to send pending event: %s\n", - globus_object_printable_to_string( - globus_error_peek(result)))); - } - break; - - case 5: /* JobTerminatedEvent */ - if (terminated_normally) - { - result = globus_scheduler_event_done(event_stamp, jobid, - return_value); - - if (result != GLOBUS_SUCCESS) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("Unable to send done event: %s\n", - globus_object_printable_to_string( - globus_error_peek(result)))); - } - } - else + if (event->job_id) { - case 9: /* JobAbortedEvent */ - result = globus_scheduler_event_failed(event_stamp, jobid, - return_value); - - if (result != GLOBUS_SUCCESS) - { - SEGCondorDebug(SEG_CONDOR_DEBUG_WARN, - ("Unable to send failed event: %s\n", - globus_object_printable_to_string( - globus_error_peek(result)))); - } + free(event->job_id); } - break; + free(event); } - globus_libc_free(jobid); - - return 0; } -/* globus_l_condor_parse_event() */ +/* globus_l_condor_event_free() */ Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/source/pkgdata/pkg_data_src.gpt.in =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/source/pkgdata/pkg_data_src.gpt.in,v retrieving revision 1.5.2.5 retrieving revision 1.5.2.5.10.1 diff -u -r1.5.2.5 -r1.5.2.5.10.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/source/pkgdata/pkg_data_src.gpt.in 31 Oct 2006 14:02:23 -0000 1.5.2.5 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/source/pkgdata/pkg_data_src.gpt.in 28 Jan 2008 21:15:55 -0000 1.5.2.5.10.1 @@ -3,7 +3,7 @@ - + Scheduler Event Generator Condor Module ResourceManagement @@ -16,11 +16,13 @@ + + Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/test/dirt.sh =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/test/dirt.sh,v retrieving revision 1.8 retrieving revision 1.8.62.1 diff -u -r1.8 -r1.8.62.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/test/dirt.sh 24 Mar 2005 15:05:16 -0000 1.8 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/test/dirt.sh 28 Jan 2008 21:15:55 -0000 1.8.62.1 @@ -1,2 +1,2 @@ -DIRT_TIMESTAMP=1111676716 -DIRT_BRANCH_ID=1 +DIRT_TIMESTAMP=1201554955 +DIRT_BRANCH_ID=0 Index: wsrf-cvs/ws-gram/job_monitoring/condor/c/test/test-condor-seg.pl =================================================================== RCS file: /home/globdev/CVS/globus-packages/ws-gram/job_monitoring/condor/c/test/test-condor-seg.pl,v retrieving revision 1.6 retrieving revision 1.6.88.1 diff -u -r1.6 -r1.6.88.1 --- wsrf-cvs/ws-gram/job_monitoring/condor/c/test/test-condor-seg.pl 24 Mar 2005 15:05:14 -0000 1.6 +++ wsrf-cvs/ws-gram/job_monitoring/condor/c/test/test-condor-seg.pl 28 Jan 2008 21:15:55 -0000 1.6.88.1 @@ -1,12 +1,13 @@ #! /usr/bin/perl use IO::File; -use File::Path; use File::Compare; +use File::Temp; use Test; +use strict; my $start = time(); -my $testtmp = &make_tmpdir(); +my $testtmp = File::Temp::tempdir(CLEANUP => 1); my @log_data; my $skip_all = 0; my $gram_condor_conf = $ENV{GLOBUS_LOCATION} . "/etc/globus-condor.conf"; @@ -27,7 +28,7 @@ sub run_test { if (! $skip_all) { - @test_data = &parse_test_data(); + my @test_data = &parse_test_data(); &write_test_data_to_log($log_path, @test_data); my $rc = &run_condor_seg("$testtmp/output"); @@ -54,7 +55,7 @@ '/libexec/globus-scheduler-event-generator'; my @args = ($seg, '-s', 'condor', '-t', $start); my $pid2 = open(FH, "|-"); - my $size; + my $size = 0; if ($pid2 == 0) { @@ -66,7 +67,7 @@ do { $size = -s $output; - sleep(5); + sleep(20); } while ($size < (-s $output)); close(FH); @@ -85,7 +86,7 @@ my $state; chomp; - ($sleep, $jobid, $type) = split(/;/, $_); + my ($sleep, $jobid, $type) = split(/;/, $_); push (@result, [$sleep, $jobid, $type]); } @@ -94,6 +95,8 @@ sub write_test_data_to_log { my $path = shift; + my @test_data = @_; + my $mtime = $start; truncate($path, 0); my $last_sleep = 0; @@ -103,7 +106,7 @@ my $datestring; my $event; - ($sleep, $jobid, $type) = ($_->[0], $_->[1], $_->[2]); + my ($sleep, $jobid, $type) = ($_->[0], $_->[1], $_->[2]); #sleep($sleep - $last_sleep); $last_sleep = $sleep; @@ -192,7 +195,9 @@ printf LOG "%03d;%d;%03d.000.000;%d;%d\n", 1, $start + $sleep, $jobid, $state, 0; close(LOG); + $mtime = $start+$sleep; } + utime $mtime, $mtime, $path; } sub get_log_path { @@ -207,8 +212,6 @@ my ($var, $val) = split(/\s*=\s*/, $_); if ($var =~ m/^log_path$/) { print TMP_CONF "log_path=$log\n"; - } else { - print TMP_CONF "$_\n"; } } close(CONF); @@ -217,50 +220,8 @@ return $log; } -sub make_tmpdir -{ - my $root; - my $suffix = '/seg_condor_test'; - my $created = 0; - my $tmpname; - my @acceptable = split(//, "abcdefghijklmnopqrstuvwxyz". - "ABCDEFGHIJKLMNOPQRSTUVWXYZ". - "0123456789"); - if(exists($ENV{TMPDIR})) - { - $root = $ENV{TMPDIR}; - } - else - { - $root = '/tmp'; - } - while($created == 0) - { - $tmpname = $root . $suffix . - $acceptable[rand() * $#acceptable] . - $acceptable[rand() * $#acceptable] . - $acceptable[rand() * $#acceptable] . - $acceptable[rand() * $#acceptable] . - $acceptable[rand() * $#acceptable] . - $acceptable[rand() * $#acceptable]; - $created = mkdir($tmpname, 0700); - if($created) - { - if(-l $tmpname or ! -d $tmpname or ! -o $tmpname) - { - $created = 0; - } - } - } - return $tmpname; -} - END { - if(-d $testtmp and -o $testtmp) - { - File::Path::rmtree($testtmp); - } if (-f $gram_condor_conf_save) { rename($gram_condor_conf_save, $gram_condor_conf); Index: gt2-cvs/gram/jobmanager/setup/condor/condor.in =================================================================== RCS file: /home/globdev/CVS/globus-packages/gram/jobmanager/setup/condor/condor.in,v retrieving revision 1.15.6.9 retrieving revision 1.15.6.9.2.1 diff -u -r1.15.6.9 -r1.15.6.9.2.1 --- gt2-cvs/gram/jobmanager/setup/condor/condor.in 11 Dec 2007 01:46:50 -0000 1.15.6.9 +++ gt2-cvs/gram/jobmanager/setup/condor/condor.in 28 Jan 2008 21:15:53 -0000 1.15.6.9.2.1 @@ -26,11 +26,13 @@ my $proto = shift; my $class = ref($proto) || $proto; my $self = $class->SUPER::new(@_); - my $log_dir; my $description = $self->{JobDescription}; my $stdout = $description->stdout(); my $stderr = $description->stderr(); my $globus_condor_conf = "$ENV{GLOBUS_LOCATION}/etc/globus-condor.conf"; + my $log_dir = ''; + my $log_perms = 0666; + my $clog; if (-r $globus_condor_conf) { @@ -42,9 +44,19 @@ chomp; if (m/log_path=(.*)$/) { $self->{condor_logfile} = $1; - break; + } elsif (m/log_dir=(.*$)/) { + $log_dir = $1; } } + + if ($log_dir ne '') + { + $clog = $description->jobdir(); + $clog =~ s|.*/||; + $self->{condor_logfile} = + $log_dir . '/' . $clog; + $log_perms = 0644; + } close(FH); } } @@ -73,7 +85,7 @@ { close(CONDOR_LOG_FILE); } - chmod(0666, $self->{condor_logfile}); + chmod($log_perms, $self->{condor_logfile}); } if($description->jobtype() eq 'multiple' && $description->count > 1) Index: gt2-cvs/gram/jobmanager/setup/condor/dirt.sh =================================================================== RCS file: /home/globdev/CVS/globus-packages/gram/jobmanager/setup/condor/dirt.sh,v retrieving revision 1.41.6.20 retrieving revision 1.41.6.20.2.2 diff -u -r1.41.6.20 -r1.41.6.20.2.2 --- gt2-cvs/gram/jobmanager/setup/condor/dirt.sh 11 Dec 2007 01:46:50 -0000 1.41.6.20 +++ gt2-cvs/gram/jobmanager/setup/condor/dirt.sh 28 Jan 2008 21:15:54 -0000 1.41.6.20.2.2 @@ -1,2 +1,2 @@ -DIRT_TIMESTAMP=1197337610 -DIRT_BRANCH_ID=63 +DIRT_TIMESTAMP=1201554954 +DIRT_BRANCH_ID=0 Index: gt2-cvs/gram/jobmanager/setup/condor/pkgdata/pkg_data_src.gpt.in =================================================================== RCS file: /home/globdev/CVS/globus-packages/gram/jobmanager/setup/condor/pkgdata/pkg_data_src.gpt.in,v retrieving revision 1.16.6.11 retrieving revision 1.16.6.11.2.1 diff -u -r1.16.6.11 -r1.16.6.11.2.1 --- gt2-cvs/gram/jobmanager/setup/condor/pkgdata/pkg_data_src.gpt.in 11 Dec 2007 01:46:50 -0000 1.16.6.11 +++ gt2-cvs/gram/jobmanager/setup/condor/pkgdata/pkg_data_src.gpt.in 28 Jan 2008 21:15:54 -0000 1.16.6.11.2.1 @@ -3,7 +3,7 @@ - + Condor Job Manager Setup ResourceManagement