This file implements the logic for worker threads that use the dap_worker_t structure. Reactor worker threads are responsible for handling input/output, being bound to specific CPU cores in the system. The number of these threads typically corresponds to the number of CPU cores in the system, or is slightly fewer. This number can be specified during initialization via the a_threads_count parameter in the dap_events_init() function, or set to 0 — in which case, it will be automatically determined based on the number of available processors.

To start the threads, initialization of the module through dap_events_init() is required, along with starting the event loop dap_events_start(). After that, all other system components can be initialized.

Worker threads are described by dap_worker_t objects, which can be accessed from the .worker field of the dap_events_socket_t or directly by the CPU core number using the dap_events_worker_get() function. The dap_events_worker_get_auto() function is also available, which automatically selects the thread with the least load. This helps to evenly distribute the load between workers and processors in the system.

The main processing loop is located within the dap_worker_thread_loop() function.

Structures:

Functions:

STRUCTURES

Worker

typedef struct dap_worker {
 
    uint32_t  id;
    dap_proc_thread_t *proc_queue_input;
    dap_events_socket_t *queue_callback; 
    dap_timerfd_t * timer_check_activity;
    dap_context_t *context;
    void * _inheritor;
    #platform related fields
    dap_events_socket_t *queue_es_new;    
    dap_events_socket_t *queue_es_delete;  
    dap_events_socket_t *queue_es_reassign; 
    dap_events_socket_t *queue_es_io;
    
} dap_worker_t;
 

The dap_worker_t structure is used in an event-driven multi-threaded system where workers handle various tasks such as processing threads, handling event sockets and more. It contains different fields that allow the worker to perform these tasks efficiently.

Fields:

  1. id: This is a uint32_t field that stores the unique identifier of the worker.
  2. proc_queue_input: A pointer to a queue structure (dap_proc_thread_t), which is used to manage and store processing threads.
  3. queue_callback: A pointer to a dap_events_socket_t structure, representing a queue that handles callback events related to event sockets.
  4. timer_check_activity: A pointer to a dap_timerfd_t structure, which stores the timer responsible for checking the worker’s activity.
  5. context: A pointer to a dap_context_t structure, representing the execution context or environment for the handler’s operation.
  6. _inheritor: A void pointer that can store a reference to the worker’s inheritor, which might represent a derived handler or functionality.

Platform related fields (for all OSes except Windows):

  1. queue_es_new: A pointer to a dap_events_socket_t structure that represents a queue used for handling new event socket creation.
  2. queue_es_delete: A pointer to a dap_events_socket_t structure that represents a queue used to handle the removal of event sockets.
  3. queue_es_reassign: A pointer to a dap_events_socket_t structure that manages the reassignment of event sockets between workers.
  4. queue_es_io: A pointer to a dap_events_socket_t structure used for managing I/O operations in the worker’s event socket.

These fields support the worker’s tasks related to event handling, processing threads, and socket management, especially in environments where multiple workers may need to handle socket events in a coordinated manner.

Message for reassignment

typedef struct dap_worker_msg_reassign {
 
    dap_events_socket_uuid_t esocket_uuid;
    dap_worker_t *worker_new;
    
} dap_worker_msg_reassign_t;
 

The dap_worker_msg_reassign_t structure is used to represent a message related to the reassignment of an event socket between workers in a system. It contains information about the event socket and the new worker to which the socket is being reassigned.

Fields:

  1. dap_events_socket_uuid_t esocket_uuid:This field stores the unique identifier (UUID) of the event socket that is being reassigned.

  2. dap_worker_t *worker_new: This field is a pointer to the new worker to which the event socket is being reassigned.

Message for input/output queue

typedef struct dap_worker_msg_io{
 
    dap_events_socket_uuid_t esocket_uuid;
    size_t data_size;
    void *data;
    uint32_t flags_set;
    uint32_t flags_unset;
    
} dap_worker_msg_io_t;
 

The dap_worker_msg_io_t structure represents a data type used for passing input/output (I/O) messages in the context of event-driven or multi-threaded applications.

Fields:

  1. dap_events_socket_uuid_t esocket_uuid: This field represents the unique identifier for the event socket associated with this message.
  2. size_t data_size: This is the size of the data pointed to by the data field. It helps in correctly interpreting or processing the transmitted data.
  3. void *data: A pointer to the data being transmitted through this message. Since this is a void *, it allows flexibility, enabling data of any type to be passed.
  4. uint32_t flags_set: Flags that need to be set during message processing. This is a 32-bit integer where each bit may represent a specific option or state that should be enabled.
  5. uint32_t flags_unset: Flags that need to be unset during message processing. Similar to the previous field, this is a 32-bit integer where each bit corresponds to a flag or state that should be disabled.

Message callback

typedef struct dap_worker_msg_callback {
 
    dap_worker_callback_t callback; 
    void * arg;
    
} dap_worker_msg_callback_t;
 

The structure dap_worker_msg_callback_t is used for representing a message that holds a callback function and its associated argument. This kind of structure is often used in event-driven or asynchronous systems where operations are triggered via callbacks.

  1. dap_worker_callback_t callback: This field represents a function pointer to a callback function. The callback function is intended to be executed for specific client operations.
  2. void *arg: This is a pointer to an argument that will be passed to the callback function when it is invoked.

FUNCTIONS

Get current worker

dap_worker_t *dap_worker_get_current ()
---
Return: dap_worker_t * (worker)

Arguments:

Function receives no arguments.

Description:

Gets a worker which is currently in use.

Worker initialization

int dap_worker_init( size_t a_conn_timeout )
---
Return: int (0 if everything is ok)

Arguments:

  1. size_t a_conn_timeoutt: Connections timeout in seconds.

Description:

The handler is initialized along with event initialization in dap_events.

In case of successful initialization, the function returns 0.

Worker deinitialization

void dap_worker_deinit( )
---
Return: void

Arguments:

Function receives no arguments.

Description:

Deinitialization of the worker.

Callback that assigns a context to the worker

int dap_worker_context_callback_started(dap_context_t *a_context, void *a_arg)
---
Return: int (0 if everything is ok)

Arguments:

  1. dap_context_t *a_context: This parameter represents the context in which the callback is executed. The dap_context_t type contains information about the current environment, state, or configuration needed for the worker to function correctly.
  2. void *a_arg: This is a pointer to an argument that is passed to the callback. The argument is a void *, which means it can point to any type of data.

Description:

At the start of its execution, the function creates pointers to the dap_worker_t structure. This function works closely with the dap_context module, sequentially calling the functions to create queue and event contexts.

The function will return an error -1 if it fails to initialize the context for the given platform.

Callback that stops worker’s context

int dap_worker_context_callback_stopped(dap_context_t *a_context, void *a_arg)
---
Return: int (0 if everything is ok)

Arguments:

  1. dap_context_t *a_context: This is a pointer to a structure of type dap_context_t, which represents the context of the handler. The context reflects the environment and conditions in which request processing will take place.
  2. void *a_arg: This is a pointer to a memory area that can contain data of any type. The parameter a_arg is used to pass additional data to the function.

Description:

At the beginning of the function, both arguments are checked for validity. If everything is correct, the function continues its execution; otherwise, it returns an error -1.

The function removes the event socket from the worker’s context and then deletes it.

After checking the worker pointer, a message is displayed indicating that the context of the current worker thread has been removed.

Add event sockets to the worker (straight)

int dap_worker_add_events_socket_unsafe(dap_worker_t *a_worker, dap_events_socket_t *a_esocket)
---
Return: int (error code)

Arguments:

  1. dap_worker_t *a_worker: This is a pointer to a dap_worker_t structure, which represents the worker to which the event socket will be added.
  2. dap_events_socket_t *a_esocket: This is a pointer to a dap_events_socket_t structure, which represents the event socket that needs to be added to the worker.

Description:

The function adds an event socket to the worker thread’s context.

Possible errors:

  • -1 - “Can’t add NULL esocket to the context” if a NULL event socket is provided.
  • -2 - “Can’t add esocket to the bad context” if the context type is not a worker.
  • The function will also return an error if it is not supported for the given socket format or platform.

Add event sockets to the worker

void dap_worker_add_events_socket(dap_worker_t *a_worker, dap_events_socket_t *a_events_socket)
---
Return: int (error code)

Arguments:

  1. dap_worker_t *a_worker: A pointer to a dap_worker_t structure, which represents the worker to which the event socket will be added.
  2. dap_events_socket_t *a_events_socket: A pointer to a dap_events_socket_t structure, which represents the event socket that will be associated with the worker.

Description:

This function adds an event socket to the worker thread. It first checks if we are in the same context as the provided worker thread. If this is true, the function adds the pointer of the events socket to the queue. Otherwise, the pointer to the event socket is sent to the queue through a safe function.

During the binding of the event socket to the handler, two errors may occur:

  • 0 - “Can’t assign esocket to worker”
  • 1 - “Can’t send pointer in queue”

Execution worker’s callback

void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg)
---
Return: void

Arguments:

  1. dap_worker_t *a_worker: A pointer to a dap_worker_t structure, which represents the worker on which the callback will be executed.
  2. dap_worker_callback_t a_callback: A callback handler structure that defines the callback function to be executed by the worker.
  3. void *a_arg: A pointer to a memory area that can hold data of any type. In this case, it is used to pass the socket ID (or other relevant data) to the callback function.

Description:

This function is used to invoke a callback a_callback that is associated with certain events, such as handling a socket, processing data, or responding to a request.

Add events socket to the least loaded worker

dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es)
---
Return: dap_worker_t * (worker)

Arguments:

  1. dap_events_socket_t *a_es: A pointer to a dap_events_socket_t structure, which represents the event socket that will be added to the worker.

Description:

This function searches for the worker with the minimal number of tied sockets and then adds the socket to it.

The result of the function is the worker with the event socket already added.

Main worker loop

int dap_worker_thread_loop(dap_context_t * a_context)
---
Return: int (error code )

Arguments:

  1. dap_context_t * a_context: This is a pointer to a structure of type dap_context_t, which represents the context of the worker. The context reflects the environment and conditions in which request processing will take place.

Description:

The body of this function handles event sockets according to their descriptor and depending on the set flag.