]> Kevux Git Server - fll/commitdiff
Bugfix: wait condition issues, memory addressing issues, and exiting issues.
authorKevin Day <thekevinday@gmail.com>
Sun, 4 Apr 2021 21:52:57 +0000 (16:52 -0500)
committerKevin Day <thekevinday@gmail.com>
Sun, 4 Apr 2021 22:45:42 +0000 (17:45 -0500)
The thread conditions, when waiting, may never stop waiting.
Switch to a timed wait, allowing for the condition to determine if it should stop waiting.
I attempting to signal the waits on exit, but this didn't work (however, this will still signal on exit).

Make sure to join all threads, add a failsafe thread join in the process delete function.

The array of processes is a single array.
When this array gets reallocated (resizing to a larger size) the memory addresses may change.
This is a serious problem in that the threads are using those addresses and they suddenly change.
The threads have no way of switching to the new addresses and memory problems happen.
Redesign the array of processes to an array of pointers to processes.
I considered using a block structure, but decided to keep it simple for now.
The downside of this is that on every resize, there must be another allocation for the process address being pointed to.
In the future, I may consider switching to a block structure where I allocated multiple blocks at a time, while still using pointers.

Rename "id_process" to "id_child" to avoid potential confusion between "processes" and "pids", given that I am using "process" with a very different context in this project.

Update all timeouts to be stored in macros.

The cleanup and exit functions were deadlocking, change the locking usage in these functions.

Add missing increment at the end of the loop when processing entry items.

level_3/controller/c/private-common.c
level_3/controller/c/private-common.h
level_3/controller/c/private-controller.c
level_3/controller/c/private-rule.c
level_3/controller/c/private-thread.c

index b38b5390221d2c088ed1ae14d1eaf250ebb8e8ea..a2ccf1929a2af6a7cbc9596b5a670604a207055d 100644 (file)
@@ -166,10 +166,16 @@ extern "C" {
 #ifndef _di_controller_process_delete_simple_
   void controller_process_delete_simple(controller_process_t *process) {
 
+    if (process->id_thread) {
+      f_thread_join(process->id_thread, 0);
+    }
+
+    f_thread_condition_signal_all(&process->wait);
+    f_thread_condition_delete(&process->wait);
+
     controller_lock_delete_rw(&process->lock);
     controller_lock_delete_rw(&process->active);
     controller_lock_delete_mutex(&process->wait_lock);
-    f_thread_condition_delete(&process->wait);
 
     controller_cache_delete_simple(&process->cache);
     controller_rule_delete_simple(&process->rule);
@@ -178,6 +184,39 @@ extern "C" {
   }
 #endif // _di_controller_process_delete_simple_
 
+#ifndef _di_controller_process_wait_
+  void controller_process_wait(const controller_main_t main, controller_process_t *process) {
+
+    if (!main.thread->enabled) return;
+
+    struct timespec time;
+    time.tv_sec = controller_thread_wait_timeout_seconds;
+    time.tv_nsec = controller_thread_wait_timeout_nanoseconds;
+
+    f_status_t status = F_none;
+
+    do {
+      f_thread_mutex_lock(&process->wait_lock);
+
+      status = f_thread_condition_wait_timed(&time, &process->wait, &process->wait_lock);
+
+      f_thread_mutex_unlock(&process->wait_lock);
+
+      if (!main.thread->enabled) break;
+
+      f_thread_lock_read(&process->lock);
+
+      if (process->status != F_known_not || !(process->state == controller_process_state_active || process->state == controller_process_state_busy)) {
+        f_thread_unlock(&process->lock);
+
+        break;
+      }
+
+      f_thread_unlock(&process->lock);
+    } while (main.thread->enabled);
+  }
+#endif // _di_controller_process_wait_
+
 #ifndef _di_controller_processs_delete_simple_
   void controller_processs_delete_simple(controller_processs_t *processs) {
 
@@ -212,28 +251,41 @@ extern "C" {
     f_status_t status = F_none;
 
     for (f_array_length_t i = length; i < processs->size; ++i) {
-      controller_process_delete_simple(&processs->array[i]);
+
+      if (processs->array[i]) {
+        controller_process_delete_simple(processs->array[i]);
+
+        f_memory_delete(1, sizeof(f_array_length_t *), (void **) & processs->array[i]);
+      }
     } // for
 
     status = f_memory_resize(processs->size, length, sizeof(controller_process_t), (void **) & processs->array);
 
     if (F_status_is_error_not(status) && length) {
 
+      controller_process_t *process = 0;
+
       // the lock must be initialized, but only once, so initialize immediately upon allocation.
       for (; processs->size < length; ++processs->size) {
 
-        status = f_thread_lock_create(0, &processs->array[processs->size].lock);
+        status = f_memory_new(1, sizeof(controller_process_t), (void **) &processs->array[processs->size]);
+
+        if (F_status_is_error_not(status)) {
+          process = processs->array[processs->size];
+
+          status = f_thread_lock_create(0, &process->lock);
+        }
 
         if (F_status_is_error_not(status)) {
-          status = f_thread_lock_create(0, &processs->array[processs->size].active);
+          status = f_thread_lock_create(0, &process->active);
         }
 
         if (F_status_is_error_not(status)) {
-          status = f_thread_condition_create(0, &processs->array[processs->size].wait);
+          status = f_thread_condition_create(0, &process->wait);
         }
 
         if (F_status_is_error_not(status)) {
-          status = f_thread_mutex_create(0, &processs->array[processs->size].wait_lock);
+          status = f_thread_mutex_create(0, &process->wait_lock);
         }
 
         if (F_status_is_error(status)) {
index a90a8021af836d8de750d29683ab47276aa41f1c..2b27685d89de837c38ca79e82d05c4f854727084 100644 (file)
@@ -828,13 +828,15 @@ extern "C" {
 /**
  * The Rule Processes.
  *
+ * Each process is a pointer of a process, to preserve memory locations that may ultimately change due to the resizing the array.
+ *
  * array: An array of rule processes.
  * size:  Total amount of allocated space.
  * used:  Total number of allocated spaces used.
  */
 #ifndef _di_controller_processs_t_
   typedef struct {
-    controller_process_t *array;
+    controller_process_t **array;
 
     f_array_length_t size;
     f_array_length_t used;
@@ -1073,6 +1075,9 @@ extern "C" {
   //#define controller_thread_exit_force_timeout 60  // 1 minute in seconds.
   #define controller_thread_exit_process_force_timeout 2000000 // 2 seconds in microseconds.
   #define controller_thread_exit_main_force_timeout 100000 // 0.1 seconds in microseconds.
+  #define controller_thread_simulation_timeout 200000 // 0.2 seconds in microseconds.
+  #define controller_thread_wait_timeout_seconds 10
+  #define controller_thread_wait_timeout_nanoseconds 0
 
   typedef struct {
     bool enabled;
@@ -1327,6 +1332,20 @@ extern "C" {
   extern void controller_process_delete_simple(controller_process_t *process) f_gcc_attribute_visibility_internal;
 #endif // _di_controller_process_delete_simple_
 
+/***
+ * Safely wait for a process, periodically checking to see if process completed or check if exiting.
+ *
+ * @param main
+ *   The main data.
+ * @param process
+ *   The process to wait on.
+ *
+ * @see f_thread_condition_wait_timed()
+ */
+#ifndef _di_controller_process_wait_
+  extern void controller_process_wait(const controller_main_t main, controller_process_t *process) f_gcc_attribute_visibility_internal;
+#endif // _di_controller_process_wait_
+
 /**
  * Fully deallocate all memory for the given processs without caring about return status.
  *
index 7222dcabae2bfb1069398d1d7b368d61a944dfc4..8c69c0017427d4e62423b671dafe6af47d518f3a 100644 (file)
@@ -251,7 +251,7 @@ extern "C" {
 
     for (f_array_length_t i = 0; i < processs.used; ++i) {
 
-      if (fl_string_dynamic_compare(alias, processs.array[i].rule.alias) == F_equal_to) {
+      if (processs.array[i] && fl_string_dynamic_compare(alias, processs.array[i]->rule.alias) == F_equal_to) {
         if (at) *at = i;
         return F_true;
       }
@@ -1045,10 +1045,10 @@ extern "C" {
                 if (F_status_is_error(status)) {
                   controller_entry_error_print(main.data->error, cache->action, F_status_set_fine(status), "controller_processs_increase", F_true, main.thread);
                 }
-                else {
+                else if (main.thread->processs.array[main.thread->processs.used]) {
 
                   // only copy the rule alias, as that is all that is needed at this point (the entire rule gets copied prior to executing/processing).
-                  controller_process_t *process = &main.thread->processs.array[main.thread->processs.used];
+                  controller_process_t *process = main.thread->processs.array[main.thread->processs.used];
 
                   f_thread_lock_write(&process->lock);
 
@@ -1225,6 +1225,9 @@ extern "C" {
           break;
         }
       }
+      else {
+        cache->ats.array[at_j]++;
+      }
     } // for
 
     if (status == F_child || status == F_signal) {
index 97183d1bbac0ef69e9ad3fdaf9b2939976e83f67..af5724b1e14213d6156842b5f6dd54f251551749 100644 (file)
@@ -978,7 +978,7 @@ extern "C" {
 
     f_status_t status = F_none;
     int result = 0;
-    pid_t id_process = 0;
+    pid_t id_child = 0;
 
     if (options & controller_rule_option_simulate) {
       if (main.data->error.verbosity != f_console_verbosity_quiet) {
@@ -1001,16 +1001,16 @@ extern "C" {
       }
 
       // sleep for less than a second to better show simulation of synchronous vs asynchronous.
-      usleep(200000);
+      usleep(controller_thread_simulation_timeout);
 
       const f_string_static_t simulated_program = f_macro_string_static_t_initialize(f_string_empty_s, 0);
       const f_string_statics_t simulated_arguments = f_string_statics_t_initialize;
       fl_execute_parameter_t simulated_parameter = fl_macro_execute_parameter_t_initialize(execute_set->parameter.option, execute_set->parameter.wait, execute_set->parameter.environment, execute_set->parameter.signals, &simulated_program);
 
-      status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, simulated_parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process);
+      status = fll_execute_program(controller_default_program_script, simulated_arguments, &simulated_parameter, &execute_set->as, simulated_parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_child);
     }
     else {
-      status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, execute_set->parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_process);
+      status = fll_execute_program(program, arguments, &execute_set->parameter, &execute_set->as, execute_set->parameter.option & fl_execute_parameter_option_return ? (void *) &result : (void *) &id_child);
     }
 
     if (status == F_parent) {
@@ -1020,13 +1020,13 @@ extern "C" {
       f_thread_lock_write(&process->lock);
 
       // assign the child process id to allow for the cancel process to send appropriate termination signals to the child process.
-      process->child = id_process;
+      process->child = id_child;
 
       f_thread_unlock(&process->lock);
       f_thread_lock_read(&process->lock);
 
       // have the parent wait for the child process to finish.
-      waitpid(id_process, &result, 0);
+      waitpid(id_child, &result, 0);
 
       f_thread_unlock(&process->lock);
       f_thread_lock_write(&process->lock);
@@ -1123,7 +1123,7 @@ extern "C" {
       }
 
       // sleep for less than a second to better show simulation of synchronous vs asynchronous.
-      usleep(200000);
+      usleep(controller_thread_simulation_timeout);
 
       const f_string_static_t simulated_program = f_macro_string_static_t_initialize(f_string_empty_s, 0);
       const f_string_statics_t simulated_arguments = f_string_statics_t_initialize;
@@ -1147,13 +1147,13 @@ extern "C" {
       f_thread_unlock(&process->lock);
       f_thread_lock_read(&process->lock);
 
-      // have the parent wait for the child process to finish.
+      // have the parent wait for the child process to finish. @todo do not wait, this is a background execution!
       waitpid(id_process, &result, 0);
 
       f_thread_unlock(&process->lock);
       f_thread_lock_write(&process->lock);
 
-      // remove the pid now that waidpid() has returned.
+      // remove the pid now that waidpid() has returned. @todo do not clear until forked execution is known to have exited, this is a background execution
       process->child = 0;
 
       f_thread_unlock(&process->lock);
@@ -1729,7 +1729,7 @@ extern "C" {
             f_thread_unlock(&main.thread->lock.rule);
             f_thread_lock_read(&main.thread->lock.process);
 
-            process_other = &main.thread->processs.array[id_process];
+            process_other = main.thread->processs.array[id_process];
 
             f_thread_lock_read(&process_other->active);
             f_thread_lock_read(&process_other->lock);
@@ -1737,11 +1737,7 @@ extern "C" {
             if (process_other->status == F_known_not && (process_other->state == controller_process_state_active || process_other->state == controller_process_state_busy)) {
               f_thread_unlock(&process_other->lock);
 
-              if (main.thread->enabled) {
-                f_thread_mutex_lock(&process_other->wait_lock);
-                f_thread_condition_wait(&process_other->wait, &process_other->wait_lock);
-                f_thread_mutex_unlock(&process_other->wait_lock);
-              }
+              controller_process_wait(main, process_other);
             }
             else {
               f_thread_unlock(&process_other->lock);
@@ -1989,7 +1985,7 @@ extern "C" {
         return status;
       }
 
-      process = &main.thread->processs.array[at];
+      process = main.thread->processs.array[at];
 
       f_thread_lock_read(&process->active);
       f_thread_lock_write(&process->lock);
@@ -4606,14 +4602,14 @@ extern "C" {
 
     for (; i < main.thread->processs.used && main.thread->enabled; ++i, skip = F_false) {
 
-      process = &main.thread->processs.array[i];
+      process = main.thread->processs.array[i];
 
       if (caller) {
         f_thread_lock_read(&main.thread->lock.rule);
 
         for (j = 0; j < caller->stack.used; ++j) {
 
-          if (fl_string_dynamic_compare(process->rule.alias, main.thread->processs.array[caller->stack.array[j]].rule.alias) == F_equal_to) {
+          if (main.thread->processs.array[caller->stack.array[j]] && fl_string_dynamic_compare(process->rule.alias, main.thread->processs.array[caller->stack.array[j]]->rule.alias) == F_equal_to) {
             skip = F_true;
           }
 
@@ -4636,9 +4632,16 @@ extern "C" {
           f_thread_lock_write(&process->lock);
 
           if (process->state == controller_process_state_done) {
-            f_thread_join(process->id_thread, 0);
-            process->state = controller_process_state_idle;
-            process->id_thread = 0;
+            f_thread_unlock(&process->active);
+
+            if (f_thread_lock_write_try(&process->active) == F_none) {
+              f_thread_join(process->id_thread, 0);
+              process->state = controller_process_state_idle;
+              process->id_thread = 0;
+            }
+            else {
+              f_thread_lock_read(&process->active);
+            }
           }
         }
 
@@ -4649,10 +4652,14 @@ extern "C" {
         continue;
       }
 
-      f_thread_unlock(&process->lock);
-      f_thread_mutex_lock(&process->wait_lock);
-      f_thread_condition_wait(&process->wait, &process->wait_lock);
-      f_thread_mutex_unlock(&process->wait_lock);
+      if (process->state == controller_process_state_active || process->state == controller_process_state_busy) {
+        f_thread_unlock(&process->lock);
+
+        controller_process_wait(main, process);
+      }
+      else {
+        f_thread_unlock(&process->lock);
+      }
 
       f_thread_unlock(&process->active);
       f_thread_lock_read(&main.thread->lock.process);
index a862844c73686b6bbfa231eacc79f543fd72ed74..b8b6c85c20533a94885182574bee997953dd0c50 100644 (file)
@@ -27,20 +27,25 @@ extern "C" {
 
         f_array_length_t i = 0;
 
-        for (; i < main->thread->processs.used && main->thread->enabled; ++i) {
+        for (; i < main->thread->processs.size && main->thread->enabled; ++i) {
 
-          process = &main->thread->processs.array[i];
+          if (!main->thread->processs.array[i]) continue;
 
+          process = main->thread->processs.array[i];
+
+          // if "active" has a read lock, then do not attempt to clean it.
           if (f_thread_lock_write_try(&process->active) != F_none) {
             continue;
           }
 
+          // if "lock" has a read or write lock, then do not attempt to clean it.
           if (f_thread_lock_write_try(&process->lock) != F_none) {
             f_thread_unlock(&process->active);
 
             continue;
           }
 
+          // if process is active or busy, then do not attempt to clean it.
           if (process->state == controller_process_state_active || process->state == controller_process_state_busy) {
             f_thread_unlock(&process->active);
             f_thread_unlock(&process->lock);
@@ -48,9 +53,17 @@ extern "C" {
             continue;
           }
 
+          f_thread_unlock(&process->lock);
+
           if (process->id_thread) {
             f_thread_join(process->id_thread, 0);
 
+            if (!main->thread->enabled) {
+              f_thread_unlock(&process->active);
+
+              break;
+            }
+
             f_thread_lock_write(&process->lock);
 
             process->state = controller_process_state_idle;
@@ -64,24 +77,30 @@ extern "C" {
           f_type_array_lengths_resize(0, &process->stack);
 
           f_thread_unlock(&process->active);
-          f_thread_unlock(&process->lock);
         } // for
 
-        if (main->thread->processs.used) {
-          for (i = main->thread->processs.used - 1; main->thread->processs.used && main->thread->enabled; --i) {
+        if (main->thread->processs.size) {
+          f_array_length_t j = main->thread->processs.size;
+
+          for (i = main->thread->processs.size - 1; j && main->thread->enabled; --i, --j) {
 
-            process = &main->thread->processs.array[i];
+            if (!main->thread->processs.array[i]) continue;
 
+            process = main->thread->processs.array[i];
+
+            // if "active" has a read lock, then do not attempt to clean it.
             if (f_thread_lock_write_try(&process->active) != F_none) {
               break;
             }
 
+            // if "lock" has a read or write lock, then do not attempt to clean it.
             if (f_thread_lock_write_try(&process->lock) != F_none) {
               f_thread_unlock(&process->active);
 
               break;
             }
 
+            // if process is active or busy, then do not attempt to clean it.
             if (process->state == controller_process_state_active || process->state == controller_process_state_busy) {
               f_thread_unlock(&process->active);
               f_thread_unlock(&process->lock);
@@ -89,9 +108,17 @@ extern "C" {
               break;
             }
 
+            f_thread_unlock(&process->lock);
+
             if (process->id_thread) {
               f_thread_join(process->id_thread, 0);
 
+              if (!main->thread->enabled) {
+                f_thread_unlock(&process->active);
+
+                break;
+              }
+
               f_thread_lock_write(&process->lock);
 
               process->state = controller_process_state_idle;
@@ -107,7 +134,6 @@ extern "C" {
             --main->thread->processs.used;
 
             f_thread_unlock(&process->active);
-            f_thread_unlock(&process->lock);
           } // for
         }
 
@@ -140,21 +166,23 @@ extern "C" {
     // @todo redesign this to use timed waits, that include a counter and a max wait such that when max wait is reached, send kill signals.
     //       this would, in theory, allow faster exits without as much waiting when there is nothing to wait for.
 
-    if (main->thread->processs.used) {
+    if (main->thread->processs.size) {
       f_thread_unlock(&main->thread->lock.process);
 
       usleep(controller_thread_exit_process_force_timeout);
 
       f_thread_lock_read(&main->thread->lock.process);
 
-      for (f_array_length_t i = 0; i < main->thread->processs.used; ++i) {
+      for (f_array_length_t i = 0; i < main->thread->processs.size; ++i) {
+
+        if (!main->thread->processs.array[i]) continue;
 
-        if (main->thread->processs.array[i].child > 0) {
-          f_signal_send(F_signal_kill, main->thread->processs.array[i].child);
+        if (main->thread->processs.array[i]->child > 0) {
+          f_signal_send(F_signal_kill, main->thread->processs.array[i]->child);
         }
 
-        if (main->thread->processs.array[i].id_thread) {
-          f_thread_signal(main->thread->processs.array[i].id_thread, F_signal_kill);
+        if (main->thread->processs.array[i]->id_thread) {
+          f_thread_signal(main->thread->processs.array[i]->id_thread, F_signal_kill);
         }
       } // for
     }
@@ -386,42 +414,48 @@ extern "C" {
 
     for (; i < main->thread->processs.used; ++i) {
 
-      process = &main->thread->processs.array[i];
+      if (!main->thread->processs.array[i]) continue;
+
+      process = main->thread->processs.array[i];
 
-      f_thread_lock_read(&process->lock);
+      f_thread_lock_read(&process->active);
 
       if (process->child > 0) {
         f_signal_send(F_signal_termination, process->child);
       }
 
-      f_thread_unlock(&process->lock);
+      f_thread_unlock(&process->active);
     } // for
 
     for (i = 0; i < main->thread->processs.used; ++i) {
 
-      process = &main->thread->processs.array[i];
+      if (!main->thread->processs.array[i]) continue;
 
-      f_thread_lock_read(&process->lock);
+      process = main->thread->processs.array[i];
+
+      f_thread_lock_read(&process->active);
 
       if (process->id_thread) {
         f_thread_cancel(process->id_thread);
       }
 
-      f_thread_unlock(&process->lock);
+      f_thread_unlock(&process->active);
     } // for
 
     for (i = 0; i < main->thread->processs.size; ++i) {
 
-      process = &main->thread->processs.array[i];
+      if (!main->thread->processs.array[i]) continue;
+
+      process = main->thread->processs.array[i];
 
       if (process->id_thread) {
         f_thread_join(process->id_thread, 0);
 
-        f_thread_lock_write(&process->lock);
+        f_thread_lock_read(&process->active);
 
         process->id_thread = 0;
 
-        f_thread_unlock(&process->lock);
+        f_thread_unlock(&process->active);
       }
     } // for