From: Kevin Day Date: Fri, 12 Feb 2021 03:53:11 +0000 (-0600) Subject: Progress: controller program. X-Git-Tag: 0.5.3~103 X-Git-Url: https://git.kevux.org/?a=commitdiff_plain;h=c6e77d53eec62d5356d5f6546ad27e9af4febabe;p=fll Progress: controller program. Continue working on the thread support. As I am learning how to use threading, I am finding that my conversion is flawed in some way and somehow losing the pointer address. I worry that this might be related to how realloc() changes the pointer address, but there is no strong evidence to this fear. I've decided to just save the current state (regardless of its state) and will rethink the design. Currently the behavior is more consistent but still problematic. --- diff --git a/level_3/controller/c/private-common.h b/level_3/controller/c/private-common.h index 86bc79c..2215a9c 100644 --- a/level_3/controller/c/private-common.h +++ b/level_3/controller/c/private-common.h @@ -482,7 +482,7 @@ extern "C" { f_status_t status; f_thread_mutex_t lock; - f_thread_mutex_t wait; + f_thread_condition_t wait; f_number_unsigned_t timeout_kill; f_number_unsigned_t timeout_start; @@ -516,15 +516,13 @@ extern "C" { f_execute_scheduler_t scheduler; controller_rule_items_t items; - - void *asynchronous; } controller_rule_t; #define controller_rule_t_initialize \ { \ F_known_not, \ f_thread_mutex_t_initialize, \ - f_thread_mutex_t_initialize, \ + f_thread_condition_t_initialize, \ 0, \ 0, \ 0, \ @@ -549,12 +547,11 @@ extern "C" { f_limit_sets_t_initialize, \ f_execute_scheduler_t_initialize, \ controller_rule_items_initialize, \ - 0, \ } #define controller_macro_rule_t_delete_simple(rule) \ f_macro_thread_mutex_t_delete_simple(rule.lock) \ - f_macro_thread_mutex_t_delete_simple(rule.wait) \ + f_macro_thread_condition_t_delete_simple(rule.wait) \ f_macro_string_dynamic_t_delete_simple(rule.id) \ f_macro_string_dynamic_t_delete_simple(rule.name) \ f_macro_string_dynamic_t_delete_simple(rule.path) \ @@ -889,6 +886,7 @@ extern "C" { typedef struct { f_thread_id_t id; + f_thread_mutex_t lock; f_array_length_t index; uint8_t state; @@ -901,7 +899,7 @@ extern "C" { controller_cache_action_t cache; } controller_asynchronous_t; - #define controller_asynchronous_t_initialize { f_thread_id_t_initialize, 0, 0, 0, 0, 0, 0, f_array_lengths_t_initialize, controller_cache_action_t_initialize } + #define controller_asynchronous_t_initialize { f_thread_id_t_initialize, f_thread_mutex_t_initialize, 0, 0, 0, 0, 0, 0, f_array_lengths_t_initialize, controller_cache_action_t_initialize } #define controller_macro_asynchronous_t_clear(asynchronous) \ f_macro_thread_id_t_clear(asynchronous.id) \ diff --git a/level_3/controller/c/private-controller.c b/level_3/controller/c/private-controller.c index 755de05..a976a0a 100644 --- a/level_3/controller/c/private-controller.c +++ b/level_3/controller/c/private-controller.c @@ -937,16 +937,21 @@ extern "C" { rule_options |= controller_rule_option_wait; } + f_thread_mutex_lock(&thread->setting->rules.array[at].lock); + if (actions->array[cache->ats.array[at_j]].code & controller_entry_rule_code_asynchronous) { rule_options |= controller_rule_option_asynchronous; status = controller_rule_process_asynchronous(at, controller_rule_action_type_start, rule_options, thread); } else { - status = controller_rule_process(at, controller_rule_action_type_start, rule_options, thread); + status = controller_rule_process(at, controller_rule_action_type_start, rule_options, thread, 0); } if (status == F_child || status == F_signal) break; + + f_thread_condition_signal(&thread->setting->rules.array[at].wait); + f_thread_mutex_unlock(&thread->setting->rules.array[at].lock); } // restore cache. diff --git a/level_3/controller/c/private-rule.c b/level_3/controller/c/private-rule.c index 0cb532c..ecdd625 100644 --- a/level_3/controller/c/private-rule.c +++ b/level_3/controller/c/private-rule.c @@ -510,7 +510,7 @@ extern "C" { #endif // _di_controller_rule_error_print_need_want_wish_ #ifndef _di_controller_rule_execute_ - f_status_t controller_rule_execute(const f_array_length_t index, const uint8_t type, const uint8_t options, controller_thread_t *thread) { + f_status_t controller_rule_execute(const f_array_length_t index, const uint8_t type, const uint8_t options, controller_thread_t *thread, controller_asynchronous_t *asynchronous) { f_status_t status = F_none; f_status_t success = F_false; @@ -613,7 +613,11 @@ extern "C" { action = &item->actions.array[j]; execute_set.parameter.data = 0; - execute_set.parameter.option = fl_execute_parameter_option_threadsafe & fl_execute_parameter_option_return; + execute_set.parameter.option = fl_execute_parameter_option_threadsafe; + + if (options & controller_rule_option_asynchronous) { + execute_set.parameter.option |= fl_execute_parameter_option_return; + } if (item->type == controller_rule_item_type_command) { @@ -621,7 +625,7 @@ extern "C" { execute_set.parameter.option |= fl_execute_parameter_option_path; } - status = controller_rule_execute_foreground(index, item->type, *action, 0, action->parameters, options, &execute_set, thread); + status = controller_rule_execute_foreground(index, item->type, *action, 0, action->parameters, options, &execute_set, thread, asynchronous); if (status == F_child) break; @@ -640,7 +644,7 @@ extern "C" { execute_set.parameter.option |= fl_execute_parameter_option_path; } - status = controller_rule_execute_foreground(index, item->type, *action, rule->script.used ? rule->script.string : controller_default_program_script, arguments_none, options, &execute_set, thread); + status = controller_rule_execute_foreground(index, item->type, *action, rule->script.used ? rule->script.string : controller_default_program_script, arguments_none, options, &execute_set, thread, asynchronous); if (status == F_child) break; @@ -658,7 +662,7 @@ extern "C" { execute_set.parameter.option |= fl_execute_parameter_option_path; } - status = controller_rule_execute_pid_with(index, item->type, *action, 0, action->parameters, options, &execute_set, thread); + status = controller_rule_execute_pid_with(index, item->type, *action, 0, action->parameters, options, &execute_set, thread, asynchronous); if (status == F_child) break; @@ -719,10 +723,11 @@ extern "C" { #endif // _di_controller_rule_execute_ #ifndef _di_controller_rule_execute_foreground_ - f_status_t controller_rule_execute_foreground(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread) { + f_status_t controller_rule_execute_foreground(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread, controller_asynchronous_t *asynchronous) { f_status_t status = F_none; int result = 0; + pid_t id_process = 0; if (options & controller_rule_option_simulate) { @@ -752,35 +757,23 @@ extern "C" { const f_string_statics_t simulated_arguments = f_string_statics_t_initialize; fl_execute_parameter_t simulated_parameter = fl_macro_execute_parameter_t_initialize(execute_set->parameter.option, execute_set->parameter.environment, execute_set->parameter.signals, &simulated_program); - status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, &result); + status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, simulated_parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process); } else { - status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, &result); + status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, execute_set->parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process); } if (status == F_parent) { - controller_asynchronous_t *asynchronous = (controller_asynchronous_t *) thread->setting->rules.array[index].asynchronous; + result = 0; // assign the child process id to the asynchronous thread to allow for the cancel process to send appropriate termination signals to the child process. - if (thread->asynchronouss.enabled) { - f_thread_mutex_lock(&thread->mutex->asynchronous); - - asynchronous->child = result; - - f_thread_mutex_unlock(&thread->mutex->asynchronous); - } + asynchronous->child = id_process; // have the parent wait for the child process to finish. - waitpid(asynchronous->child, &result, WUNTRACED | WCONTINUED); + waitpid(id_process, &result, WUNTRACED | WCONTINUED); // remove the pid now that waidpid() has returned. - if (thread->asynchronouss.enabled) { - f_thread_mutex_lock(&thread->mutex->asynchronous); - - asynchronous->child = 0; - - f_thread_mutex_unlock(&thread->mutex->asynchronous); - } + asynchronous->child = 0; // this must explicitly check for 0 (as opposed to checking (!result)). if (!WIFEXITED(result)) { @@ -800,8 +793,6 @@ extern "C" { } if (status == F_child) { - //thread->data->child = result; // @fixme cant do this! - return F_child; } @@ -829,17 +820,16 @@ extern "C" { status = F_status_set_error(status); } - // thread->data->child = 0; // @fixme cant do this! - return status; } #endif // _di_controller_rule_execute_foreground_ #ifndef _di_controller_rule_execute_pid_with_ - f_status_t controller_rule_execute_pid_with(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread) { + f_status_t controller_rule_execute_pid_with(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread, controller_asynchronous_t *asynchronous) { f_status_t status = F_none; int result = 0; + pid_t id_process = 0; // @todo check to see if pid file exists. @@ -876,37 +866,23 @@ extern "C" { const f_string_statics_t simulated_arguments = f_string_statics_t_initialize; fl_execute_parameter_t simulated_parameter = fl_macro_execute_parameter_t_initialize(execute_set->parameter.option, execute_set->parameter.environment, execute_set->parameter.signals, &simulated_program); - status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, &result); + status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, simulated_parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process); } else { - status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, &result); + status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, execute_set->parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process); } if (status == F_parent) { - controller_asynchronous_t *asynchronous = (controller_asynchronous_t *) thread->setting->rules.array[index].asynchronous; - result = 0; // assign the child process id to the asynchronous thread to allow for the cancel process to send appropriate termination signals to the child process. - if (thread->asynchronouss.enabled) { - f_thread_mutex_lock(&thread->mutex->asynchronous); - - asynchronous->child = result; + asynchronous->child = id_process; - f_thread_mutex_unlock(&thread->mutex->asynchronous); - - // have the parent wait for the child process to finish. (@todo see comments above about forking into the background, this code block will need to change, maybe pass WNOHANG?.) - waitpid(asynchronous->child, &result, WUNTRACED | WCONTINUED); - } + // have the parent wait for the child process to finish. + waitpid(id_process, &result, WUNTRACED | WCONTINUED); // remove the pid now that waidpid() has returned. - if (thread->asynchronouss.enabled) { - f_thread_mutex_lock(&thread->mutex->asynchronous); - - asynchronous->child = 0; - - f_thread_mutex_unlock(&thread->mutex->asynchronous); - } + asynchronous->child = 0; // this must explicitly check for 0 (as opposed to checking (!result)). if (!WIFEXITED(result)) { @@ -926,8 +902,6 @@ extern "C" { } if (status == F_child) { - //thread->data->child = result; // @fixme cant do this! - return F_child; } @@ -952,15 +926,11 @@ extern "C" { f_thread_mutex_unlock(&thread->mutex->print); - //thread->data->child = 0; // @fixme cant do this! - return F_status_set_error(status); } // @todo wait for pid file or timeout. - //thread->data->child = 0; // @fixme cant do this! - return status; } #endif // _di_controller_rule_execute_pid_with_ @@ -1383,7 +1353,7 @@ extern "C" { #endif // _di_controller_rule_path_ #ifndef _di_controller_rule_process_ - f_status_t controller_rule_process(const f_array_length_t index, const uint8_t action, const uint8_t options, controller_thread_t *thread) { + f_status_t controller_rule_process(const f_array_length_t index, const uint8_t action, const uint8_t options, controller_thread_t *thread, controller_asynchronous_t *asynchronous) { switch (action) { case controller_rule_action_type_freeze: @@ -1447,12 +1417,16 @@ extern "C" { if (thread->stack->array[i] == index) { if (thread->data->error.verbosity != f_console_verbosity_quiet) { + f_thread_mutex_lock(&thread->mutex->print); + fprintf(thread->data->error.to.stream, "%c", f_string_eol_s[0]); fprintf(thread->data->error.to.stream, "%s%sThe rule '", thread->data->error.context.before->string, thread->data->error.prefix ? thread->data->error.prefix : f_string_empty_s); fprintf(thread->data->error.to.stream, "%s%s%s%s", thread->data->error.context.after->string, thread->data->error.notable.before->string, thread->setting->rules.array[i].name.string, thread->data->error.notable.after->string); fprintf(thread->data->error.to.stream, "%s' is already on the execution stack, this recursion is prohibited.%s%c", thread->data->error.context.before->string, thread->data->error.context.after->string, f_string_eol_s[0]); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + + f_thread_mutex_unlock(&thread->mutex->print); } // never continue on recursion errors even in simulate mode. @@ -1471,18 +1445,26 @@ extern "C" { } if (F_status_is_error(status)) { + f_thread_mutex_lock(&thread->mutex->print); + fll_error_print(thread->data->error, F_status_set_fine(status), "f_string_append", F_true); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + return status; } status = f_string_dynamic_append(thread->setting->rules.array[index].id, &thread->cache_action->name_file); if (F_status_is_error(status)) { + f_thread_mutex_lock(&thread->mutex->print); + fll_error_print(thread->data->error, F_status_set_fine(status), "f_string_dynamic_append", F_true); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + return status; } @@ -1493,18 +1475,26 @@ extern "C" { } if (F_status_is_error(status)) { + f_thread_mutex_lock(&thread->mutex->print); + fll_error_print(thread->data->error, F_status_set_fine(status), "f_string_append", F_true); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + return status; } status = f_string_dynamic_terminate_after(&thread->cache_action->name_file); if (F_status_is_error(status)) { + f_thread_mutex_lock(&thread->mutex->print); + fll_error_print(thread->data->error, F_status_set_fine(status), "f_string_dynamic_terminate_after", F_true); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + return status; } @@ -1540,24 +1530,44 @@ extern "C" { if (at == thread->setting->rules.used) { if (i == 0) { + f_thread_mutex_lock(&thread->mutex->print); + controller_rule_error_print_need_want_wish(thread->data->error, strings[i], dynamics[i]->array[j].string, "was not found"); status = F_status_set_error(F_found_not); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + if (!(options & controller_rule_option_simulate)) break; } else { if (thread->data->warning.verbosity == f_console_verbosity_debug) { + f_thread_mutex_lock(&thread->mutex->print); + controller_rule_error_print_need_want_wish(thread->data->warning, strings[i], dynamics[i]->array[j].string, "was not found"); controller_rule_error_print(thread->data->warning, *thread->cache_action, F_true); + + f_thread_mutex_unlock(&thread->mutex->print); } } } if (F_status_is_error_not(status) && at < thread->setting->rules.used) { - controller_rule_wait_for(at, thread); + f_thread_mutex_lock(&thread->setting->rules.array[at].lock); + + while (thread->asynchronouss.enabled && thread->setting->rules.array[at].status == F_known_not) { + f_thread_condition_wait(&thread->setting->rules.array[at].wait, &thread->setting->rules.array[at].lock); + } // while + + if (!thread->asynchronouss.enabled) { + f_thread_condition_signal(&thread->setting->rules.array[at].wait); + f_thread_mutex_unlock(&thread->setting->rules.array[at].lock); + + status = F_signal; + break; + } // when the status is unknown, then the rule has not been executed, so attempt to execute it. if (thread->setting->rules.array[at].status == F_known_not) { @@ -1565,9 +1575,16 @@ extern "C" { f_macro_array_lengths_t_increase_by(status, (*thread->stack), controller_default_allocation_step) if (F_status_is_error(status)) { + f_thread_mutex_lock(&thread->mutex->print); + fll_error_print(thread->data->error, F_status_set_fine(status), "f_macro_array_lengths_t_increase_by", F_true); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + + f_thread_condition_signal(&thread->setting->rules.array[at].wait); + f_thread_mutex_unlock(&thread->setting->rules.array[at].lock); + // always exit on memory errors, even in simulate mode. break; } @@ -1589,9 +1606,14 @@ extern "C" { memcpy(cache_name_file, thread->cache_action->name_file.string, thread->cache_action->name_file.used); // recursive rule processing is to always be synchronous. - status = controller_rule_process(at, action, options, thread); + status = controller_rule_process(at, action, options & controller_rule_option_asynchronous ? options - controller_rule_option_asynchronous : options, thread, 0); + + if (status == F_child || status == F_signal) { + f_thread_condition_signal(&thread->setting->rules.array[at].wait); + f_thread_mutex_unlock(&thread->setting->rules.array[at].lock); - if (status == F_child || status == F_signal) break; + break; + } // restore cache. memcpy(thread->cache_action->name_action.string, cache_name_action, cache_name_action_used); @@ -1611,35 +1633,58 @@ extern "C" { if (F_status_is_error(status)) { if (i == 0 || i == 1 || F_status_set_fine(status) == F_memory_not) { + f_thread_mutex_lock(&thread->mutex->print); + controller_rule_error_print_need_want_wish(thread->data->error, strings[i], dynamics[i]->array[j].string, "failed during execution"); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + if (!(options & controller_rule_option_simulate) || F_status_set_fine(status) == F_memory_not) { + f_thread_condition_signal(&thread->setting->rules.array[at].wait); + f_thread_mutex_unlock(&thread->setting->rules.array[at].lock); + break; } } else { if (thread->data->warning.verbosity == f_console_verbosity_debug) { + f_thread_mutex_lock(&thread->mutex->print); + controller_rule_error_print_need_want_wish(thread->data->warning, strings[i], dynamics[i]->array[j].string, "failed during execution"); controller_rule_error_print(thread->data->warning, *thread->cache_action, F_true); + + f_thread_mutex_unlock(&thread->mutex->print); } } } } - else if (F_status_is_error(thread->setting->rules.array[at].status)) { + + f_thread_condition_signal(&thread->setting->rules.array[at].wait); + f_thread_mutex_unlock(&thread->setting->rules.array[at].lock); + + if (F_status_is_error(thread->setting->rules.array[at].status)) { if (i == 0 || i == 1) { + f_thread_mutex_lock(&thread->mutex->print); + controller_rule_error_print_need_want_wish(thread->data->error, strings[i], dynamics[i]->array[j].string, "is in a failed state"); status = F_status_set_error(F_found_not); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + f_thread_mutex_unlock(&thread->mutex->print); + if (!(options & controller_rule_option_simulate)) break; } else { if (thread->data->warning.verbosity == f_console_verbosity_debug) { + f_thread_mutex_lock(&thread->mutex->print); + controller_rule_error_print_need_want_wish(thread->data->warning, strings[i], dynamics[i]->array[j].string, "is in a failed state"); controller_rule_error_print(thread->data->warning, *thread->cache_action, F_true); + + f_thread_mutex_unlock(&thread->mutex->print); } } } @@ -1656,8 +1701,17 @@ extern "C" { return status; } + if (!thread->asynchronouss.enabled) { + return F_signal; + } + + // @fixme changing design, "wait" will be a distinct thing not associated with individual rules. this will be in its own process function. if (!(options & controller_rule_option_wait) && F_status_is_error_not(status)) { controller_rule_wait_all(thread); + + if (!thread->asynchronouss.enabled) { + return F_signal; + } } if (!(options & controller_rule_option_validate) && F_status_is_error_not(status)) { @@ -1682,6 +1736,8 @@ extern "C" { if (missing) { if (thread->data->error.verbosity != f_console_verbosity_quiet) { + f_thread_mutex_lock(&thread->mutex->print); + fprintf(thread->data->error.to.stream, "%c", f_string_eol_s[0]); fprintf(thread->data->error.to.stream, "%s%sThe rule '", thread->data->error.context.before->string, thread->data->error.prefix ? thread->data->error.prefix : f_string_empty_s); fprintf(thread->data->error.to.stream, "%s%s%s%s", thread->data->error.context.after->string, thread->data->error.notable.before->string, rule->name.used ? rule->name.string : f_string_empty_s, thread->data->error.notable.after->string); @@ -1690,6 +1746,8 @@ extern "C" { fprintf(thread->data->error.to.stream, "%s' action to execute.%s%c", thread->data->error.context.before->string, thread->data->error.context.after->string, f_string_eol_s[0]); controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + + f_thread_mutex_unlock(&thread->mutex->print); } status = F_status_set_error(F_parameter); @@ -1697,14 +1755,18 @@ extern "C" { } if (F_status_is_error_not(status)) { - status = controller_rule_execute(index, action, options, thread); + status = controller_rule_execute(index, action, options, thread, asynchronous); if (status == F_child) { return F_child; } + if (!thread->asynchronouss.enabled) { + return F_signal; + } + if (F_status_is_error(status)) { - controller_rule_error_print(thread->data->error, *thread->cache_action, F_true); + controller_rule_error_print_locked(thread->data->error, *thread->cache_action, F_true, thread); } } } @@ -1788,7 +1850,6 @@ extern "C" { bool for_item = F_true; rule->status = F_known_not; - rule->asynchronous = 0; // @todo: timeouts may be passed from entry, consider to or not to initialize in a more consistent manner. //rule->timeout_kill = 2; @@ -3988,67 +4049,46 @@ extern "C" { #ifndef _di_controller_rule_wait_all_ void controller_rule_wait_all(controller_thread_t *thread) { - for (f_array_length_t i = 0; i < thread->asynchronouss.used; ++i) { + for (f_array_length_t i = 0; i < thread->asynchronouss.used && thread->asynchronouss.enabled; ++i) { - if (!thread->asynchronouss.array[i].state) continue; - - controller_rule_wait_for(i, thread); - } // for - } -#endif // _di_controller_rule_wait_all_ - -#ifndef _di_controller_rule_wait_for_ - void controller_rule_wait_for(const f_array_length_t index, controller_thread_t *thread) { + // do not need to wait when state is 0 or joined. + if (!thread->asynchronouss.array[i].state || thread->asynchronouss.array[i].state == controller_asynchronous_state_joined) { + continue; + } - if (index >= thread->setting->rules.used) { - return; - } + if (thread->asynchronouss.array[i].index >= thread->setting->rules.used) { + continue; + } - controller_rule_t *rule = &thread->setting->rules.array[index]; - controller_asynchronous_t *asynchronous = (controller_asynchronous_t *) rule->asynchronous; + f_thread_mutex_lock(&thread->setting->rules.array[thread->asynchronouss.array[i].index].lock); - if (!rule->asynchronous) { - return; - } + while (thread->asynchronouss.enabled && thread->setting->rules.array[thread->asynchronouss.array[i].index].status == F_known_not) { + f_thread_condition_wait(&thread->setting->rules.array[thread->asynchronouss.array[i].index].wait, &thread->setting->rules.array[thread->asynchronouss.array[i].index].lock); + } // while - // do not need to wait when state is 0. - if (!asynchronous->state) { - return; - } + f_thread_mutex_unlock(&thread->setting->rules.array[thread->asynchronouss.array[i].index].lock); - if (f_thread_mutex_lock_try(&rule->wait) == F_none) { + if (!thread->asynchronouss.enabled) break; - if (asynchronous->state != controller_asynchronous_state_joined) { - f_thread_join(asynchronous->id, 0); - } + if (f_thread_mutex_lock_try(&thread->mutex->asynchronous) == F_none) { - if (thread->asynchronouss.enabled) { - if (f_thread_mutex_lock_try(&thread->mutex->asynchronous) == F_none) { - if (asynchronous->state) { - if (asynchronous->state == controller_asynchronous_state_done) { - asynchronous->state = controller_asynchronous_state_joined; - } + if (thread->asynchronouss.array[i].state != controller_asynchronous_state_joined) { + f_thread_join(thread->asynchronouss.array[i].id, 0); + } - controller_macro_cache_action_t_clear(asynchronous->cache); + if (thread->asynchronouss.enabled && thread->asynchronouss.array[i].state) { + if (thread->asynchronouss.array[i].state == controller_asynchronous_state_done) { + thread->asynchronouss.array[i].state = controller_asynchronous_state_joined; } - f_thread_mutex_unlock(&thread->mutex->asynchronous); + controller_macro_cache_action_t_clear(thread->asynchronouss.array[i].cache); } - } - - f_thread_mutex_unlock(&rule->wait); - } - else { - - // a wait lock is already in place, which will also be responsible for thread joining. - // this can therefore immediately unlock and return. - f_thread_mutex_lock(&rule->wait); - - f_thread_mutex_unlock(&rule->wait); - } + f_thread_mutex_unlock(&thread->mutex->asynchronous); + } + } // for } -#endif // _di_controller_rule_wait_for_ +#endif // _di_controller_rule_wait_all_ #ifdef __cplusplus } // extern "C" diff --git a/level_3/controller/c/private-rule.h b/level_3/controller/c/private-rule.h index 0de920e..6ecb15c 100644 --- a/level_3/controller/c/private-rule.h +++ b/level_3/controller/c/private-rule.h @@ -244,6 +244,9 @@ extern "C" { * If bit controller_rule_option_simulate, then the rule execution is in simulation mode (printing a message that the rule would be executed but does not execute the rule). * @param thread * The thread data. + * @param asynchronous + * Holds the current asynchronous thread information if this is being run from within one. + * Set to NULL when this is not being called from within an asynchronous thread. * * @return * F_none on success. @@ -255,7 +258,7 @@ extern "C" { * On failure, the individual status for the rule is set to an appropriate error status. */ #ifndef _di_controller_rule_execute_ - extern f_status_t controller_rule_execute(const f_array_length_t index, const uint8_t type, const uint8_t options, controller_thread_t *thread) f_gcc_attribute_visibility_internal; + extern f_status_t controller_rule_execute(const f_array_length_t index, const uint8_t type, const uint8_t options, controller_thread_t *thread, controller_asynchronous_t *asynchronous) f_gcc_attribute_visibility_internal; #endif // _di_controller_rule_execute_ /** @@ -285,6 +288,9 @@ extern "C" { * The execute parameter and as settings. * @param thread * The thread data. + * @param asynchronous + * Holds the current asynchronous thread information if this is being run from within one. + * Set to NULL when this is not being called from within an asynchronous thread. * * @return * F_none on success. @@ -296,7 +302,7 @@ extern "C" { * @see fll_execute_program() */ #ifndef _di_controller_rule_execute_foreground_ - extern f_status_t controller_rule_execute_foreground(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread) f_gcc_attribute_visibility_internal; + extern f_status_t controller_rule_execute_foreground(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread, controller_asynchronous_t *asynchronous) f_gcc_attribute_visibility_internal; #endif // _di_controller_rule_execute_foreground_ /** @@ -329,6 +335,9 @@ extern "C" { * The execute parameter and as settings. * @param thread * The thread data. + * @param asynchronous + * Holds the current asynchronous thread information if this is being run from within one. + * Set to NULL when this is not being called from within an asynchronous thread. * * @return * F_none on success. @@ -341,7 +350,7 @@ extern "C" { * @see fll_execute_program() */ #ifndef _di_controller_rule_execute_pid_with_ - extern f_status_t controller_rule_execute_pid_with(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread) f_gcc_attribute_visibility_internal; + extern f_status_t controller_rule_execute_pid_with(const f_array_length_t index, const uint8_t type, const controller_rule_action_t action, const f_string_t program, const f_string_dynamics_t arguments, const uint8_t options, controller_execute_set_t * const execute_set, controller_thread_t *thread, controller_asynchronous_t *asynchronous) f_gcc_attribute_visibility_internal; #endif // _di_controller_rule_execute_pid_with_ /** @@ -530,6 +539,9 @@ extern "C" { * If bit controller_rule_option_asynchronous, then run asynchronously. * @param thread * The thread data. + * @param asynchronous + * Holds the current asynchronous thread information if this is being run from within one. + * Set to NULL when this is not being called from within an asynchronous thread. * * @return * F_none on success. @@ -537,7 +549,7 @@ extern "C" { * F_signal on (exit) signal received. */ #ifndef _di_controller_rule_process_ - extern f_status_t controller_rule_process(const f_array_length_t index, const uint8_t action, const uint8_t options, controller_thread_t *thread) f_gcc_attribute_visibility_internal; + extern f_status_t controller_rule_process(const f_array_length_t index, const uint8_t action, const uint8_t options, controller_thread_t *thread, controller_asynchronous_t *asynchronous) f_gcc_attribute_visibility_internal; #endif // _di_controller_rule_process_ /** @@ -696,18 +708,6 @@ extern "C" { extern void controller_rule_wait_all(controller_thread_t *thread) f_gcc_attribute_visibility_internal; #endif // _di_controller_rule_wait_all_ -/** - * Wait until the specific rule is done running, if running asynchronously. - * - * @param index - * The index of the rule to wait for. - * @param thread - * The thread data. - */ -#ifndef _di_controller_rule_wait_for_ - extern void controller_rule_wait_for(const f_array_length_t index, controller_thread_t *thread) f_gcc_attribute_visibility_internal; -#endif // _di_controller_rule_wait_for_ - #ifdef __cplusplus } // extern "C" #endif diff --git a/level_3/controller/c/private-thread.c b/level_3/controller/c/private-thread.c index ba985c9..b39d6e1 100644 --- a/level_3/controller/c/private-thread.c +++ b/level_3/controller/c/private-thread.c @@ -22,6 +22,7 @@ extern "C" { controller_thread_t thread = controller_thread_t_initialize; f_thread_mutex_lock(&thread_main->setting->rules.array[asynchronous->index].lock); + f_thread_mutex_lock(&asynchronous->lock); thread.cache_main = thread_main->cache_main; thread.cache_action = &asynchronous->cache; @@ -30,20 +31,14 @@ extern "C" { thread.setting = thread_main->setting; thread.stack = &asynchronous->stack; - if (controller_rule_process(asynchronous->index, asynchronous->action, asynchronous->options, &thread) == F_child) { - // @todo consider returning 1 to designate that this is a child process exiting. - return 0; - } - - if (thread.asynchronouss.enabled) { - //f_thread_mutex_lock(&thread_main->mutex->asynchronous); - + if (controller_rule_process(asynchronous->index, asynchronous->action, asynchronous->options, &thread, asynchronous) != F_child) { asynchronous->state = controller_asynchronous_state_done; - - //f_thread_mutex_unlock(&thread_main->mutex->asynchronous); } - f_thread_mutex_unlock(&thread.setting->rules.array[asynchronous->index].lock); + f_thread_mutex_unlock(&asynchronous->lock); + + f_thread_condition_signal(&thread_main->setting->rules.array[asynchronous->index].wait); + f_thread_mutex_unlock(&thread_main->setting->rules.array[asynchronous->index].lock); return 0; } @@ -54,37 +49,32 @@ extern "C" { thread->asynchronouss.enabled = F_false; - f_array_length_t i = 0; - f_thread_mutex_lock(&thread->mutex->asynchronous); - for (; i < thread->asynchronouss.used; ++i) { + for (f_array_length_t i = 0; i < thread->asynchronouss.used; ++i) { if (!thread->asynchronouss.array[i].state) continue; - if (thread->asynchronouss.array[i].state == controller_asynchronous_state_active) { + if (f_thread_mutex_lock_try(&thread->asynchronouss.array[i].lock) == F_none) { + f_thread_cancel(thread->asynchronouss.array[i].id); + f_thread_detach(thread->asynchronouss.array[i].id); + f_thread_mutex_unlock(&thread->asynchronouss.array[i].lock); + } + else { if (thread->asynchronouss.array[i].child > 0) { f_signal_send(F_signal_termination, thread->asynchronouss.array[i].child); } + else { + f_thread_cancel(thread->asynchronouss.array[i].id); + } - thread->asynchronouss.array[i].state = controller_asynchronous_state_done; - } - - if (thread->asynchronouss.array[i].state == controller_asynchronous_state_done) { - // @todo perhaps a timed join here where if it takes to long, try sending a kill signal to the child process. - f_thread_join(thread->asynchronouss.array[i].id, 0); + // the cancel make take time so detach the process to allow it to exit on its own. + f_thread_detach(thread->asynchronouss.array[i].id); } - - thread->asynchronouss.array[i].state = 0; - - controller_macro_cache_action_t_clear(thread->asynchronouss.array[i].cache); } // for thread->asynchronouss.used = 0; - f_thread_mutex_unlock(&thread->mutex->print); - f_thread_mutex_unlock(&thread->mutex->cache); - f_thread_mutex_unlock(&thread->mutex->rule); f_thread_mutex_unlock(&thread->mutex->asynchronous); } #endif // _di_controller_thread_asynchronous_cancel_ @@ -98,7 +88,7 @@ extern "C" { f_array_length_t i = 0; - for (;;) { + for (; thread->asynchronouss.enabled; ) { sleep(interval); if (f_thread_mutex_lock_try(&thread->mutex->cache) == F_none) { @@ -110,8 +100,11 @@ extern "C" { if (thread->asynchronouss.used) { for (i = 0; i < thread->asynchronouss.used; ++i) { + if (!thread->asynchronouss.enabled) break; if (!thread->asynchronouss.array[i].state) continue; + if (f_thread_mutex_lock_try(&thread->asynchronouss.array[i].lock) != F_none) continue; + if (f_thread_mutex_lock_try(&thread->setting->rules.array[thread->asynchronouss.array[i].index].lock) == F_none) { if (thread->asynchronouss.array[i].state == controller_asynchronous_state_done) { @@ -125,24 +118,34 @@ extern "C" { thread->asynchronouss.array[i].state = 0; } + f_thread_condition_signal(&thread->setting->rules.array[thread->asynchronouss.array[i].index].wait); f_thread_mutex_unlock(&thread->setting->rules.array[thread->asynchronouss.array[i].index].lock); } + + f_thread_mutex_unlock(&thread->asynchronouss.array[i].lock); } // for for (i = thread->asynchronouss.used - 1; thread->asynchronouss.used; --i, --thread->asynchronouss.used) { + if (!thread->asynchronouss.enabled) break; + + if (f_thread_mutex_lock_try(&thread->asynchronouss.array[i].lock) != F_none) break; + if (thread->asynchronouss.array[i].state == controller_asynchronous_state_joined) { controller_macro_asynchronous_t_delete_simple(thread->asynchronouss.array[i]); thread->asynchronouss.array[i].state = 0; } else if (thread->asynchronouss.array[i].state) { + f_thread_mutex_unlock(&thread->asynchronouss.array[i].lock); break; } + + f_thread_mutex_unlock(&thread->asynchronouss.array[i].lock); } // for } - if (thread->asynchronouss.used < thread->asynchronouss.size) { + if (thread->asynchronouss.enabled && thread->asynchronouss.used < thread->asynchronouss.size) { controller_asynchronouss_resize(thread->asynchronouss.used, &thread->asynchronouss); } @@ -299,7 +302,9 @@ extern "C" { f_thread_join(thread_signal, 0); } - controller_thread_asynchronous_cancel(thread); + if (thread->asynchronouss.enabled) { + controller_thread_asynchronous_cancel(thread); + } f_thread_cancel(thread_cache); f_thread_cancel(thread_control); @@ -339,13 +344,15 @@ extern "C" { controller_thread_t *thread = (controller_thread_t *) arguments; - for (int signal = 0; ; ) { + for (int signal = 0; thread->asynchronouss.enabled; ) { sigwait(&thread->data->signal.set, &signal); if (thread->data->parameters[controller_parameter_interruptable].result == f_console_result_found) { if (signal == F_signal_interrupt || signal == F_signal_abort || signal == F_signal_quit || signal == F_signal_termination) { thread->setting->signal = signal; + + controller_thread_asynchronous_cancel(thread); break; } }