From 35eaff9760b3f6c2f1b362e5de0013b6ddfe9424 Mon Sep 17 00:00:00 2001 From: Kevin Day Date: Sun, 4 Apr 2021 16:52:57 -0500 Subject: [PATCH] Bugfix: wait condition issues, memory addressing issues, and exiting issues. The thread conditions, when waiting, may never stop waiting. Switch to a timed wait, allowing for the condition to determine if it should stop waiting. I attempting to signal the waits on exit, but this didn't work (however, this will still signal on exit). Make sure to join all threads, add a failsafe thread join in the process delete function. The array of processes is a single array. When this array gets reallocated (resizing to a larger size) the memory addresses may change. This is a serious problem in that the threads are using those addresses and they suddenly change. The threads have no way of switching to the new addresses and memory problems happen. Redesign the array of processes to an array of pointers to processes. I considered using a block structure, but decided to keep it simple for now. The downside of this is that on every resize, there must be another allocation for the process address being pointed to. In the future, I may consider switching to a block structure where I allocated multiple blocks at a time, while still using pointers. Rename "id_process" to "id_child" to avoid potential confusion between "processes" and "pids", given that I am using "process" with a very different context in this project. Update all timeouts to be stored in macros. The cleanup and exit functions were deadlocking, change the locking usage in these functions. Add missing increment at the end of the loop when processing entry items. --- level_3/controller/c/private-common.c | 64 ++++++++++++++++++++++--- level_3/controller/c/private-common.h | 21 ++++++++- level_3/controller/c/private-controller.c | 9 ++-- level_3/controller/c/private-rule.c | 57 ++++++++++++---------- level_3/controller/c/private-thread.c | 78 ++++++++++++++++++++++--------- 5 files changed, 172 insertions(+), 57 deletions(-) diff --git a/level_3/controller/c/private-common.c b/level_3/controller/c/private-common.c index b38b539..a2ccf19 100644 --- a/level_3/controller/c/private-common.c +++ b/level_3/controller/c/private-common.c @@ -166,10 +166,16 @@ extern "C" { #ifndef _di_controller_process_delete_simple_ void controller_process_delete_simple(controller_process_t *process) { + if (process->id_thread) { + f_thread_join(process->id_thread, 0); + } + + f_thread_condition_signal_all(&process->wait); + f_thread_condition_delete(&process->wait); + controller_lock_delete_rw(&process->lock); controller_lock_delete_rw(&process->active); controller_lock_delete_mutex(&process->wait_lock); - f_thread_condition_delete(&process->wait); controller_cache_delete_simple(&process->cache); controller_rule_delete_simple(&process->rule); @@ -178,6 +184,39 @@ extern "C" { } #endif // _di_controller_process_delete_simple_ +#ifndef _di_controller_process_wait_ + void controller_process_wait(const controller_main_t main, controller_process_t *process) { + + if (!main.thread->enabled) return; + + struct timespec time; + time.tv_sec = controller_thread_wait_timeout_seconds; + time.tv_nsec = controller_thread_wait_timeout_nanoseconds; + + f_status_t status = F_none; + + do { + f_thread_mutex_lock(&process->wait_lock); + + status = f_thread_condition_wait_timed(&time, &process->wait, &process->wait_lock); + + f_thread_mutex_unlock(&process->wait_lock); + + if (!main.thread->enabled) break; + + f_thread_lock_read(&process->lock); + + if (process->status != F_known_not || !(process->state == controller_process_state_active || process->state == controller_process_state_busy)) { + f_thread_unlock(&process->lock); + + break; + } + + f_thread_unlock(&process->lock); + } while (main.thread->enabled); + } +#endif // _di_controller_process_wait_ + #ifndef _di_controller_processs_delete_simple_ void controller_processs_delete_simple(controller_processs_t *processs) { @@ -212,28 +251,41 @@ extern "C" { f_status_t status = F_none; for (f_array_length_t i = length; i < processs->size; ++i) { - controller_process_delete_simple(&processs->array[i]); + + if (processs->array[i]) { + controller_process_delete_simple(processs->array[i]); + + f_memory_delete(1, sizeof(f_array_length_t *), (void **) & processs->array[i]); + } } // for status = f_memory_resize(processs->size, length, sizeof(controller_process_t), (void **) & processs->array); if (F_status_is_error_not(status) && length) { + controller_process_t *process = 0; + // the lock must be initialized, but only once, so initialize immediately upon allocation. for (; processs->size < length; ++processs->size) { - status = f_thread_lock_create(0, &processs->array[processs->size].lock); + status = f_memory_new(1, sizeof(controller_process_t), (void **) &processs->array[processs->size]); + + if (F_status_is_error_not(status)) { + process = processs->array[processs->size]; + + status = f_thread_lock_create(0, &process->lock); + } if (F_status_is_error_not(status)) { - status = f_thread_lock_create(0, &processs->array[processs->size].active); + status = f_thread_lock_create(0, &process->active); } if (F_status_is_error_not(status)) { - status = f_thread_condition_create(0, &processs->array[processs->size].wait); + status = f_thread_condition_create(0, &process->wait); } if (F_status_is_error_not(status)) { - status = f_thread_mutex_create(0, &processs->array[processs->size].wait_lock); + status = f_thread_mutex_create(0, &process->wait_lock); } if (F_status_is_error(status)) { diff --git a/level_3/controller/c/private-common.h b/level_3/controller/c/private-common.h index a90a802..2b27685 100644 --- a/level_3/controller/c/private-common.h +++ b/level_3/controller/c/private-common.h @@ -828,13 +828,15 @@ extern "C" { /** * The Rule Processes. * + * Each process is a pointer of a process, to preserve memory locations that may ultimately change due to the resizing the array. + * * array: An array of rule processes. * size: Total amount of allocated space. * used: Total number of allocated spaces used. */ #ifndef _di_controller_processs_t_ typedef struct { - controller_process_t *array; + controller_process_t **array; f_array_length_t size; f_array_length_t used; @@ -1073,6 +1075,9 @@ extern "C" { //#define controller_thread_exit_force_timeout 60 // 1 minute in seconds. #define controller_thread_exit_process_force_timeout 2000000 // 2 seconds in microseconds. #define controller_thread_exit_main_force_timeout 100000 // 0.1 seconds in microseconds. + #define controller_thread_simulation_timeout 200000 // 0.2 seconds in microseconds. + #define controller_thread_wait_timeout_seconds 10 + #define controller_thread_wait_timeout_nanoseconds 0 typedef struct { bool enabled; @@ -1327,6 +1332,20 @@ extern "C" { extern void controller_process_delete_simple(controller_process_t *process) f_gcc_attribute_visibility_internal; #endif // _di_controller_process_delete_simple_ +/*** + * Safely wait for a process, periodically checking to see if process completed or check if exiting. + * + * @param main + * The main data. + * @param process + * The process to wait on. + * + * @see f_thread_condition_wait_timed() + */ +#ifndef _di_controller_process_wait_ + extern void controller_process_wait(const controller_main_t main, controller_process_t *process) f_gcc_attribute_visibility_internal; +#endif // _di_controller_process_wait_ + /** * Fully deallocate all memory for the given processs without caring about return status. * diff --git a/level_3/controller/c/private-controller.c b/level_3/controller/c/private-controller.c index 7222dca..8c69c00 100644 --- a/level_3/controller/c/private-controller.c +++ b/level_3/controller/c/private-controller.c @@ -251,7 +251,7 @@ extern "C" { for (f_array_length_t i = 0; i < processs.used; ++i) { - if (fl_string_dynamic_compare(alias, processs.array[i].rule.alias) == F_equal_to) { + if (processs.array[i] && fl_string_dynamic_compare(alias, processs.array[i]->rule.alias) == F_equal_to) { if (at) *at = i; return F_true; } @@ -1045,10 +1045,10 @@ extern "C" { if (F_status_is_error(status)) { controller_entry_error_print(main.data->error, cache->action, F_status_set_fine(status), "controller_processs_increase", F_true, main.thread); } - else { + else if (main.thread->processs.array[main.thread->processs.used]) { // only copy the rule alias, as that is all that is needed at this point (the entire rule gets copied prior to executing/processing). - controller_process_t *process = &main.thread->processs.array[main.thread->processs.used]; + controller_process_t *process = main.thread->processs.array[main.thread->processs.used]; f_thread_lock_write(&process->lock); @@ -1225,6 +1225,9 @@ extern "C" { break; } } + else { + cache->ats.array[at_j]++; + } } // for if (status == F_child || status == F_signal) { diff --git a/level_3/controller/c/private-rule.c b/level_3/controller/c/private-rule.c index 97183d1..af5724b 100644 --- a/level_3/controller/c/private-rule.c +++ b/level_3/controller/c/private-rule.c @@ -978,7 +978,7 @@ extern "C" { f_status_t status = F_none; int result = 0; - pid_t id_process = 0; + pid_t id_child = 0; if (options & controller_rule_option_simulate) { if (main.data->error.verbosity != f_console_verbosity_quiet) { @@ -1001,16 +1001,16 @@ extern "C" { } // sleep for less than a second to better show simulation of synchronous vs asynchronous. - usleep(200000); + usleep(controller_thread_simulation_timeout); const f_string_static_t simulated_program = f_macro_string_static_t_initialize(f_string_empty_s, 0); const f_string_statics_t simulated_arguments = f_string_statics_t_initialize; fl_execute_parameter_t simulated_parameter = fl_macro_execute_parameter_t_initialize(execute_set->parameter.option, execute_set->parameter.wait, execute_set->parameter.environment, execute_set->parameter.signals, &simulated_program); - status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, simulated_parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process); + status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, simulated_parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_child); } else { - status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, execute_set->parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process); + status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, execute_set->parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_child); } if (status == F_parent) { @@ -1020,13 +1020,13 @@ extern "C" { f_thread_lock_write(&process->lock); // assign the child process id to allow for the cancel process to send appropriate termination signals to the child process. - process->child = id_process; + process->child = id_child; f_thread_unlock(&process->lock); f_thread_lock_read(&process->lock); // have the parent wait for the child process to finish. - waitpid(id_process, &result, 0); + waitpid(id_child, &result, 0); f_thread_unlock(&process->lock); f_thread_lock_write(&process->lock); @@ -1123,7 +1123,7 @@ extern "C" { } // sleep for less than a second to better show simulation of synchronous vs asynchronous. - usleep(200000); + usleep(controller_thread_simulation_timeout); const f_string_static_t simulated_program = f_macro_string_static_t_initialize(f_string_empty_s, 0); const f_string_statics_t simulated_arguments = f_string_statics_t_initialize; @@ -1147,13 +1147,13 @@ extern "C" { f_thread_unlock(&process->lock); f_thread_lock_read(&process->lock); - // have the parent wait for the child process to finish. + // have the parent wait for the child process to finish. @todo do not wait, this is a background execution! waitpid(id_process, &result, 0); f_thread_unlock(&process->lock); f_thread_lock_write(&process->lock); - // remove the pid now that waidpid() has returned. + // remove the pid now that waidpid() has returned. @todo do not clear until forked execution is known to have exited, this is a background execution process->child = 0; f_thread_unlock(&process->lock); @@ -1729,7 +1729,7 @@ extern "C" { f_thread_unlock(&main.thread->lock.rule); f_thread_lock_read(&main.thread->lock.process); - process_other = &main.thread->processs.array[id_process]; + process_other = main.thread->processs.array[id_process]; f_thread_lock_read(&process_other->active); f_thread_lock_read(&process_other->lock); @@ -1737,11 +1737,7 @@ extern "C" { if (process_other->status == F_known_not && (process_other->state == controller_process_state_active || process_other->state == controller_process_state_busy)) { f_thread_unlock(&process_other->lock); - if (main.thread->enabled) { - f_thread_mutex_lock(&process_other->wait_lock); - f_thread_condition_wait(&process_other->wait, &process_other->wait_lock); - f_thread_mutex_unlock(&process_other->wait_lock); - } + controller_process_wait(main, process_other); } else { f_thread_unlock(&process_other->lock); @@ -1989,7 +1985,7 @@ extern "C" { return status; } - process = &main.thread->processs.array[at]; + process = main.thread->processs.array[at]; f_thread_lock_read(&process->active); f_thread_lock_write(&process->lock); @@ -4606,14 +4602,14 @@ extern "C" { for (; i < main.thread->processs.used && main.thread->enabled; ++i, skip = F_false) { - process = &main.thread->processs.array[i]; + process = main.thread->processs.array[i]; if (caller) { f_thread_lock_read(&main.thread->lock.rule); for (j = 0; j < caller->stack.used; ++j) { - if (fl_string_dynamic_compare(process->rule.alias, main.thread->processs.array[caller->stack.array[j]].rule.alias) == F_equal_to) { + if (main.thread->processs.array[caller->stack.array[j]] && fl_string_dynamic_compare(process->rule.alias, main.thread->processs.array[caller->stack.array[j]]->rule.alias) == F_equal_to) { skip = F_true; } @@ -4636,9 +4632,16 @@ extern "C" { f_thread_lock_write(&process->lock); if (process->state == controller_process_state_done) { - f_thread_join(process->id_thread, 0); - process->state = controller_process_state_idle; - process->id_thread = 0; + f_thread_unlock(&process->active); + + if (f_thread_lock_write_try(&process->active) == F_none) { + f_thread_join(process->id_thread, 0); + process->state = controller_process_state_idle; + process->id_thread = 0; + } + else { + f_thread_lock_read(&process->active); + } } } @@ -4649,10 +4652,14 @@ extern "C" { continue; } - f_thread_unlock(&process->lock); - f_thread_mutex_lock(&process->wait_lock); - f_thread_condition_wait(&process->wait, &process->wait_lock); - f_thread_mutex_unlock(&process->wait_lock); + if (process->state == controller_process_state_active || process->state == controller_process_state_busy) { + f_thread_unlock(&process->lock); + + controller_process_wait(main, process); + } + else { + f_thread_unlock(&process->lock); + } f_thread_unlock(&process->active); f_thread_lock_read(&main.thread->lock.process); diff --git a/level_3/controller/c/private-thread.c b/level_3/controller/c/private-thread.c index a862844..b8b6c85 100644 --- a/level_3/controller/c/private-thread.c +++ b/level_3/controller/c/private-thread.c @@ -27,20 +27,25 @@ extern "C" { f_array_length_t i = 0; - for (; i < main->thread->processs.used && main->thread->enabled; ++i) { + for (; i < main->thread->processs.size && main->thread->enabled; ++i) { - process = &main->thread->processs.array[i]; + if (!main->thread->processs.array[i]) continue; + process = main->thread->processs.array[i]; + + // if "active" has a read lock, then do not attempt to clean it. if (f_thread_lock_write_try(&process->active) != F_none) { continue; } + // if "lock" has a read or write lock, then do not attempt to clean it. if (f_thread_lock_write_try(&process->lock) != F_none) { f_thread_unlock(&process->active); continue; } + // if process is active or busy, then do not attempt to clean it. if (process->state == controller_process_state_active || process->state == controller_process_state_busy) { f_thread_unlock(&process->active); f_thread_unlock(&process->lock); @@ -48,9 +53,17 @@ extern "C" { continue; } + f_thread_unlock(&process->lock); + if (process->id_thread) { f_thread_join(process->id_thread, 0); + if (!main->thread->enabled) { + f_thread_unlock(&process->active); + + break; + } + f_thread_lock_write(&process->lock); process->state = controller_process_state_idle; @@ -64,24 +77,30 @@ extern "C" { f_type_array_lengths_resize(0, &process->stack); f_thread_unlock(&process->active); - f_thread_unlock(&process->lock); } // for - if (main->thread->processs.used) { - for (i = main->thread->processs.used - 1; main->thread->processs.used && main->thread->enabled; --i) { + if (main->thread->processs.size) { + f_array_length_t j = main->thread->processs.size; + + for (i = main->thread->processs.size - 1; j && main->thread->enabled; --i, --j) { - process = &main->thread->processs.array[i]; + if (!main->thread->processs.array[i]) continue; + process = main->thread->processs.array[i]; + + // if "active" has a read lock, then do not attempt to clean it. if (f_thread_lock_write_try(&process->active) != F_none) { break; } + // if "lock" has a read or write lock, then do not attempt to clean it. if (f_thread_lock_write_try(&process->lock) != F_none) { f_thread_unlock(&process->active); break; } + // if process is active or busy, then do not attempt to clean it. if (process->state == controller_process_state_active || process->state == controller_process_state_busy) { f_thread_unlock(&process->active); f_thread_unlock(&process->lock); @@ -89,9 +108,17 @@ extern "C" { break; } + f_thread_unlock(&process->lock); + if (process->id_thread) { f_thread_join(process->id_thread, 0); + if (!main->thread->enabled) { + f_thread_unlock(&process->active); + + break; + } + f_thread_lock_write(&process->lock); process->state = controller_process_state_idle; @@ -107,7 +134,6 @@ extern "C" { --main->thread->processs.used; f_thread_unlock(&process->active); - f_thread_unlock(&process->lock); } // for } @@ -140,21 +166,23 @@ extern "C" { // @todo redesign this to use timed waits, that include a counter and a max wait such that when max wait is reached, send kill signals. // this would, in theory, allow faster exits without as much waiting when there is nothing to wait for. - if (main->thread->processs.used) { + if (main->thread->processs.size) { f_thread_unlock(&main->thread->lock.process); usleep(controller_thread_exit_process_force_timeout); f_thread_lock_read(&main->thread->lock.process); - for (f_array_length_t i = 0; i < main->thread->processs.used; ++i) { + for (f_array_length_t i = 0; i < main->thread->processs.size; ++i) { + + if (!main->thread->processs.array[i]) continue; - if (main->thread->processs.array[i].child > 0) { - f_signal_send(F_signal_kill, main->thread->processs.array[i].child); + if (main->thread->processs.array[i]->child > 0) { + f_signal_send(F_signal_kill, main->thread->processs.array[i]->child); } - if (main->thread->processs.array[i].id_thread) { - f_thread_signal(main->thread->processs.array[i].id_thread, F_signal_kill); + if (main->thread->processs.array[i]->id_thread) { + f_thread_signal(main->thread->processs.array[i]->id_thread, F_signal_kill); } } // for } @@ -386,42 +414,48 @@ extern "C" { for (; i < main->thread->processs.used; ++i) { - process = &main->thread->processs.array[i]; + if (!main->thread->processs.array[i]) continue; + + process = main->thread->processs.array[i]; - f_thread_lock_read(&process->lock); + f_thread_lock_read(&process->active); if (process->child > 0) { f_signal_send(F_signal_termination, process->child); } - f_thread_unlock(&process->lock); + f_thread_unlock(&process->active); } // for for (i = 0; i < main->thread->processs.used; ++i) { - process = &main->thread->processs.array[i]; + if (!main->thread->processs.array[i]) continue; - f_thread_lock_read(&process->lock); + process = main->thread->processs.array[i]; + + f_thread_lock_read(&process->active); if (process->id_thread) { f_thread_cancel(process->id_thread); } - f_thread_unlock(&process->lock); + f_thread_unlock(&process->active); } // for for (i = 0; i < main->thread->processs.size; ++i) { - process = &main->thread->processs.array[i]; + if (!main->thread->processs.array[i]) continue; + + process = main->thread->processs.array[i]; if (process->id_thread) { f_thread_join(process->id_thread, 0); - f_thread_lock_write(&process->lock); + f_thread_lock_read(&process->active); process->id_thread = 0; - f_thread_unlock(&process->lock); + f_thread_unlock(&process->active); } } // for -- 1.8.3.1