Работа с вводом/выводом
Реактор ввода/вывода
Реактор ввода вывода базируется на таких функциях как poll(), epoll(), select() и других (kqueue), которые позволяют группировать обработчики ввода-вывода (сокеты, дескрипторы файлов, именованные каналы и прочее) вместе, позволяя гарантированно последовательно обрабатывать ввод-вывод, что избавляет от необходимости использовать семафоры, мьютексы и иные объекты ядра для синхронизации данных между обработчиками. Также этот подход имеет ряд других преимуществ, в частности при большом количестве одновременных соединений не происходит частого переключения контекстов потоков обработки, съедающих до 30% системного времени при большом количестве соединений.
Реактор состоит из потоков-обработчиков, каждый из которых привязан к своему процессору в системе - переключение контекстов без смены процессора гораздо быстрее, чем с ней, к тому же выделенный поток обработки сигнализирует драйверам и подсистеме ввода/вывода, что тут можно применять ряд иных оптимизирующих техник, навроде FlowControl при которых балансировка между аппаратными очередями происходит непосредственно процессором сетевой карты, которые затем транслируются напрямую в выделенные для отдельных процессоров участки памяти и непосредственно на эти же процессоры целенаправленно отправляются прерывания или сообщения об обновлении индексов-чтения записи в кольцевых буферах (зависит от железа). Не все драйвера, операционки, сетевые карты и материнские платы такое поддерживают, виртуальные машины также вносят свою лепту в разнообразие - это всё стоит учитывать при работе с вводом выводом: например пакет из multiqueue tun устройства может приходить как вся сессия в один и тот же дескриптор, так и случайным образом размазываться по остальным дескрипторам на соседних процессорах, так и группироваться на 2-3ёх (из 8и). Внутри потоков обработчиков по условно-бесконечному циклу бегает обработка событий на дескрипторе (epoll() и его коллеги). К каждому дескриптору привязан событийный сокет.
Событийный сокет
Сокет описан структурой dap_events_socket_t, с указателем на экземпляр которой придется часто сталкиваться. Данная структура группирует данные, связанные с системным обработчиком ввода/вывода, в частности коллбэки чтения/записи, а так же ряда других событий. Кроме того она же занимается дополнительной буферизацией данных, более подробно это описано в приложении. В механике есокетов важно учитывать, из какого именно участка кода вы обращаетесь к данному указателю - контекст исполнения. В зависимости от контекста вы можете или не можете обращаться к его полям (в будущих версиях они видимо полностью исчезнут, переместившись в приватную субструктуру), а так же должны будете вызывать функции с постфиксом _mt либо _safe в зависимости от того, работаете ли в собственном контексте есокета или вне него.
Типы
Событийный сокет может быть разных типов для очень разных задач. Их все объединяет то, что они работают через файловые/сокетовые дескрипторы и связаны с определенным типом этих дескрипторов в системе. Типы описаны в перечисляемом типе dap_events_desc_type_t:
● DESCRIPTOR_TYPE_SOCKET сокет для исходящего или входящего соединения ● DESCRIPTOR_TYPE_SOCKET_LISTENING сокет, слушающий порт и принимающий новые соединения ● DESCRIPTOR_TYPE_QUEUE Очередь данных или указателей на них ● DESCRIPTOR_TYPE_PIPE Труба бинарных данных ● DESCRIPTOR_TYPE_TIMER Таймер ● DESCRIPTOR_TYPE_EVENT Событие ● DESCRIPTOR_TYPE_FILE Файловый дескриптор
typedef enum {
DESCRIPTOR_TYPE_SOCKET_CLIENT = 0,
DESCRIPTOR_TYPE_SOCKET_LOCAL_CLIENT,
DESCRIPTOR_TYPE_SOCKET_LISTENING,
DESCRIPTOR_TYPE_SOCKET_LOCAL_LISTENING,
DESCRIPTOR_TYPE_SOCKET_UDP,
DESCRIPTOR_TYPE_SOCKET_CLIENT_SSL,
DESCRIPTOR_TYPE_FILE,
DESCRIPTOR_TYPE_PIPE,
DESCRIPTOR_TYPE_QUEUE,
/* all above are readable/writeable */
DESCRIPTOR_TYPE_TIMER,
DESCRIPTOR_TYPE_EVENT
} dap_events_desc_type_t;
Потоки-обработчики реактора
Потоки-обработчики реактора или иначе воркеры (указатели на dap_worker_t) это то, что обрабатывает ввод-вывод, будучи эксклюзивно привязаны к конкретному CPU в системе, поэтому их количество обычно такое же или немного меньше, чем количество процессоров в системе, - данное количество можно указать при инициализации dap_events_init в параметре a_threads_count либо указать нам 0 и оно будет выбрано автоматически по количество CPU в системе. Для их запуска требуется не только инициализации модуля dap_events_init но и создание объекта dap_events_t с помощью функции dap_events_new и дальнейшего старта вызовом dap_events_start после чего можно спокойно инициализировать всё, что только требуется. При необходимости можно встать в ожидании завершения потоков-обработчиков реактора вызвав dap_events_wait - если приложение построено в основном на DapSDK то это как правило нужно делать в main.c чтобы не завершить основной поток исполнения раньше времени, или принудительно сигнализировать завершение через dap_events_stop_all
Сами потоки описаны объектами dap_worker_t которые в собственном контексте исполнения можно получить из поля.worker событийного сокета, а так же напрямую по номеру CPU через dap_events_worker_get или запросив автоматическую выборку из потоков-обработчиков реактора через dap_events_worker_get_auto - в таком случае будет выбран обработчик с наименьшей нагрузкой. Как можно понять, последнее используется для равномерного распределения нагрузки между обработчиками реактора и CPU.
Процессор коллбэков
Процессор коллбэков как и реактор состоит из потоков-обработчиков так же эксклюзивно привязанных к CPU. В отличии от реактора, процессор коллбэков предназначен для более длительных по времени операций, но при этом всё ещё ограниченных по времени исполнения, так как коллбэки в каждом потоке-обработчике исполняются последовательно в контексте одного и того же потока - неограниченные по длительности операции рекомендуется исполнять в их собственных потоках. Типичное применение коллбэков - перекидывание событийного сокета и связанных с ним объектов из реактора в процессор коллбэков для достаточно длительных операций, например обращение к базе данных и формирование ответа, а затем по завершению забрасывание событийного сокета обратно в реактор.
Очереди коллбэков
Очередь коллбэков нужна для потоко-безопасного приёма новых коллбэков в очередь исполнения, для чего есть функции void dap_proc_queue_add_callback и void dap_proc_queue_add_callback с помощью которых можно разместить коллбэк на автоматически выбранной очереди либо на конкретную указанную очередь. Указатель на экземпляр очереди, связанной с конкретным CPU, есть у объекта dap_worker_t в поле.proc_queue - переключение контекста между потоками на одном и том же CPU это всё же менее времязатратная операция, нежели переключения между потоками на разных CPU, поэтому рекомендуется использовать ту очередь потоков исполнения, что соответствует контексту исполнения, из которого это использование происходит, - кроме тех случаев когда идёт множественное добавление коллбэков из одного и того же контекста и более актуальна задача балансирования нагрузки между CPU.
Потоки-обработчики коллбэков
Аналогичны потокам-обработчикам реактора, только обрабатывают лишь события готовности к чтению для приёма новых коллбэков. Внутри в цикле последовательно выполняются коллбэки, если они возвращают значение true - их удаляет из списка. Важно: коллбэк для процессора должен исполняться ограниченное время, не столь малое как в случае реактора, но слишком большим оно тоже не должно быть, в идеале не более секунды-две. В случе если надо сделать что-то большее - лучше запомнить состояние и выйти с false тогда на следующем цикле коллбэк исполнится снова. Если такое невозможно или предугадать время исполнение не представляется возможным в принцпе - лучше создать свой собственный поток и работать в нём, чтобы не затормаживать исполнение остальных коллбэков в процессоре.
Application
dap_events_worker_get_index_min
DESCRIPTOR_TYPE_SOCKET_LISTENING
dap_events_socket_create_type_queue_ptr_unsafe
dap_events_socket_create_type_queue_ptr_mt
dap_events_socket_queue_proc_input_unsafe
dap_events_socket_create_type_event_unsafe
dap_events_socket_create_type_event_mt
dap_events_socket_event_proc_input_unsafe
dap_events_socket_create_type_pipe_unsafe
dap_events_socket_create_type_pipe_mt
dap_events_socket_queue_ptr_send
dap_events_socket_event_signal
dap_events_socket_assign_on_worker_unsafe
dap_events_socket_assign_on_worker_mt
dap_events_socket_reassign_between_workers_mt
dap_events_socket_pop_from_buf_in
dap_events_socket_check_unsafe
dap_events_socket_set_readable_unsafe
dap_events_socket_set_writable_unsafe
dap_events_socket_write_unsafe
dap_events_socket_write_f_unsafe
dap_events_socket_remove_and_delete_unsafe
dap_events_socket_remove_from_worker_unsafe
dap_events_socket_set_readable_mt
dap_events_socket_set_writable_mt
dap_events_socket_remove_and_delete_mt
dap_events_socket_shrink_buf_in
dap_events
Общее управление событийным реактором ввода-вывода
dap_events_t
dap_events_init
int dap_events_init(uint32_t a_threads_count, size_t a_conn_timeout)
Init server module
dap_events_deinit
void dap_events_deinit()
Deinit server module
dap_events_new
dap_events_t dap_events_new();*
dap_events_get_default
dap_events_t dap_events_get_default();*
dap_events_delete
void dap_events_delete(dap_events_t * a_events);
dap_events_start
*int32_t dap_events_start(dap_events_t a_events)
dap_events_stop_all
void dap_events_stop_all()
dap_events_wait
*int32_t dap_events_wait(dap_events_t a_events)
dap_events_worker_print_all
void dap_events_worker_print_all()
dap_events_worker_get_index_min
uint32_t dap_events_worker_get_index_min()
dap_events_worker_get_count
uint32_t dap_events_worker_get_count()
dap_events_worker_get_auto
*dap_worker_t dap_events_worker_get_auto()
dap_events_worker_get
dap_worker_t * dap_events_worker_get(uint8_t a_index)
dap_get_cpu_count
uint32_t dap_get_cpu_count()
dap_cpu_assign_thread_on
void dap_cpu_assign_thread_on(uint32_t a_cpu_id)
dap_events_socket
Функции и типы данных для работе с dap_events_socket****подключаются через <dap_events_socket.h>.
dap_events_socket_t
Содержит в себе коллбэки, буферы ввода/вывода и ряд иных данных, непосредственно связанных с событийным сокетом. В виду особенностей обработки, указатель на dap_events_socket_t****следует воспринимать как хэндлер вне его собственно контекста и обращаться к полям структуры только внутри собственного контекста, желательно через специальные функции с постфиксом ****unsafe Стоит отдельно обратить внимание, что многие функции сгруппированы попарно
_unsafe****и mt- для потоконебезопасного и потокобезопасного применения. Внутри своего контекста всегда применяйте unsafeвне контекста - ****mt
Указатель на поток-обработчик, в контексте которого обрабатывается событийный сокет. Может быть NULL в случае если событийный покинул контексты реактора и скажем переместился в процессор коллбэков или ещё куда-нибудь.
Если данный событийный сокет представляет с собой соединение с удалённым клиентом, полученное с помощью объекта dap_server_tпредставляющий собой обёртку вокруг слушающего сокета, то это поле будето содержать указатель на этот объект. У всех остальных оно NULL
Юнион хэндлера сокета и файлового дескриптора POSIX, юнион может так же иметь представление в винде нативных хэндлеров Windows, дескриптора mqueue или иных хэндлеров ввода/вывода.
Флаги событийного сокета. Могут быть:
DAP_SOCK_READY_TO_READ
Флаг чтения, если он стоит, то поток-обработчик будет взводить флаг события чтения
(POLLIN, EPOLLIN и иже с ними) при попадании в контекст обработчикаи
соотвественно наоборот, он подымается при вызове функции
dap_events_socket_set_readable_unsafeили dap_events_socket_set_readable_mtвместе
с взведением/снятием флагов события чтения.
DAP_SOCK_READY_TO_WRITE
Флаг чтения, если он стоит, то поток-обработчикбудет взводить флаг события чтения
(POLLIN, EPOLLIN и иже с ними) при попадании в контекст обработчикаи
соотвественно наоборот, он подымается при вызове функции
dap_events_socket_set_writable_mtили dap_events_socket_set_writable_unsafeвместе
с взведением/снятием флагов события чтения.
DAP_SOCK_SIGNAL_CLOSE
Выставление этого флага приведёт к закрытию сокета после выполнения всех коллбэков в его контексте
DAP_SOCK_ACTIVE
На данный момент не используется
DAP_SOCK_REASSIGN_ONCE
Даёт перекинуть есокет между потоками-обработчиками только один раз. Используется для реализации технологии FlowControl чтобы на тех системах, где её нет, не происходило беспорядочное перекидывание событийных сокетов между потоками-обработчиками
DAP_SOCK_QUEUE_PTR
Более подробно тут
Указатель на объект-наследник (если есть)
dap_events_desc_type_t
Перечисляемый тип, содержащий тип событийного сокета. Может быть одним из:
DESCRIPTOR_TYPE_SOCKET
То же самое, что и дескриптор, только вместо read()/write() используются send()/recv()
и немного иначе получаются коды ошибок для error_callback’а, а так всё тоже самое. Для красоты в union объединены поля socket и fd - впрочем, напрямую с ними работать не прийдётся (а в будущем и вообще будет убрана такая возможность). Главное, помнить, что работать напрямую с полями dap_events_socket_t можно лишь в собственном контексте этого объекта - в контексте обрабатывающего воркера, либо процессора коллбэков, если сокет и работа с ним “переброшены” туда или ещё куда-нибудь.
DESCRIPTOR_TYPE_SOCKET_LISTENING
DESCRIPTOR_TYPE_QUEUE
Очередь данных в обычном FIFO буфере, реализовано через версию POSIX функции pipe(), её линукс варианта pipe2, POSIX очередей mqueue либо аналоги. Все в блокирующем режиме, так что в случае переполнения трубы отправка “зависнет”. Отличие от трубы в том, что передача идёт по пакетам - отправили 100 байт одним куском - на выходе из очереди так же одним куском появилось 100 байт. Выход из очереди пишется в fd поле, а вход в fd2 что может быть использовано потом для отправки данных в очередь. Очередь односторонняя, соответственно флаги записи/чтения смысла не имеют. Флаг чтения собственно по умолчанию взведён всегда и на событие EPOLLIN происходит считывание данных и передача их в коллбэк queue_callback
DAP_SOCK_QUEUE_PTR
Очередь может иметь взведённый флаг DAP**_SOCK_QUEUE_PTR в таком случае размер данных, пересылаемых по очереди, ограничен длиной указателя (на 32ух битных платформах это 4 байта, на 64 битных это 8 байт соответственно) и вызывается другой коллбэк queue_ptr_callback который передаёт только указатель без размера передаваемых данных. Сами данные через такую очередь не передаются, туда забрасывается только указатель, что удобно для передачи небольших
структур-сообщений типа dap_worker_msg_io_t****через который реализованы потоко-безопасные ф-ии ввода вывода
DESCRIPTOR_TYPE_PIPE
Обёртка вокруг POSIX pipe() в потоковом неблокирующем режиме.
DESCRIPTOR_TYPE_TIMER
Событие, случающееся по заранее заданному таймеру, создаётся через
DESCRIPTOR_TYPE_EVENT
Некоторое событие с внутренним флагом-счётчиком, куда можно выставить какое-то число, которое будет увеличивать внутренний счётчик. Соответственно при множественном вызове события счётчик будет увеличен последовательно всеми отправителями сигнала (через функцию dap_events_socket_event_signal), так как файловый дескриптор является блокируемым, а при обработке будет получен суммарный результат во втором параметре в коллбэке *event_callback(dap_events_socekt_t , uint64_r)
DESCRIPTOR_TYPE_FILE
При работе с файлами используется этот тип. В нём на событие EPOLLIN (далее события указываются только для ***epoll()***, для ***poll()***__это будет POLLIN для select’а это будет read_set и так далее) вызывается ***read()***__и затем, а при наличии флага записи выставляется реакция на событие EPOLLOUT и вызывается коллбэк write_callback а затем ***write()***__на все данные в буфере отправке. action_callback и все остальные его партнёры по union’у не используются
dap_events_socket_callbacks_t
Структура с перечислением коллбэков событийного сокета. Все коллбэки выполняются
в контексте потока-обработчика реактора
void () (dap_events_socket_t * a_esocket,void * a_arg)*
Коллбэк, инициируемый после создания событийного сокета и добавления его в потоки-обработчик реактора,выполняется в контексте потока-обработчикаодин раз, если далее произойдёт смена контекста - коллбэк не вызывается. В аргументе a_esocket****передаёт указатель на событийный сокет, a_arg****всегда NULL и не используется (depricated)
void () (dap_events_socket_t * a_esocket,void * a_arg)*
Вызывается при удалении событийного сокета. После вызова коллбэка удаляется память выделанная под сам событийный сокет, а так же данные его наследника, если они есть в поле ._inheritor. В аргументе a_esocket****передаёт указатель на событийный сокет, a_arg****всегда NULL и не используется (depricated)
void () (dap_events_socket_t * a_esocket,void * a_arg)*
Коллбэк, выполняемый после операции чтения, в нём должна происходить обработка входящих данных. В аргументе a_esocket****передаёт указатель на событийный сокет, a_arg****всегда NULL и не используется (depricated)
void () (dap_events_socket_t * a_esocket,void * a_arg)*
Коллбэк, выполняемый перед операцией записи, в нём должна происходить подготовка данных для отправки. В аргументе a_esocket****передаёт указатель на событийный сокет, a_arg****всегда NULL и не используется (depricated)
void () (dap_events_socket_t * a_esocket, int a_errnum)*
Обработчик ошибок, в аргументе a_esocket****передаёт указатель на событийный сокет, a_errnum****передаёт код ошибки
.worker_assign_callback
Вызывается после успешного попадания событийного сокета в контекст исполнения потока-обработчика реактора, в аргументе a_esocket****передаёт указатель на событийный сокет,
dap_events_socket_worker_callback_t worker_unassign_callback; // After successful worker unassign
Тут объединяются указатели на коллбэк действия, в зависимости от типа событийного сокета они могут иметь разные сигнатуры
void () (dap_events_socket_t * a_esocket, int a_new_sock, struct sockaddr* a_new_sock_addr)*
Вызывается после успешного приёма входящего соединения. a_esocket****указатель на событийный сокет, прослушивающий входящие соединения, a_new_sock****сокет нового входящего соединения, a_new_sock_addr****указатель на структуру с удаленным адресом нового входящего соединения
Timer callback for listening socket
Timer callback for listening socket
Queue callback for listening socket
Timer callback for listening socket
dap_events_socket_create_type_queue_ptr_unsafe
dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback);
Создаёт событийный сокет типа QUEUE_PTR****предназначенный для потоко-безопасной пересылки указателей через объект ядра типа “очередь”. Может иметь разные реализации, в частности на разных платформах и быть выполненным
как в виде нативных pipe2, в виде POSIX mqueue****, pipe****или socketpair, в виде чего-то виндоспецефичного, так и в самых грустных случаев в виде кольцевого буфера с rwlock’ом.
Принимаемые параметры:
● a_w указатель на dap_worker_t****на котором создаётся новый объект
● a_callback****указатель на функцию-обработчик входящих указателей из очереди,
синтаксис у функции void**** callback(dap_events_socket_t * a_esocket, void * a_ptr)****где a_esocket****это указатель на событийный сокет, возвращаемый функцией ****dap_events_socket_create_type_queue_ptr_unsafe, а a_ptr****это указатель, который приходит из очереди. Выполнятся коллбэк будет в собственном контексте dap_events_socket_t****в указанном для него a_w****воркере и должен использовать unsafeфункции для работы с собственными данными
Возвращаемое значение:
Указатель на событийный сокет, в контексте которого будет обрабатывать данные коллбэк.
Потокобезопасность:
Небезопасен, использовать только в собственном контексте воркера
dap_events_socket_create_type_queue_ptr_mt
dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback);
dap_events_socket_queue_proc_input_unsafe
int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket);
dap_events_socket_create_type_event_unsafe
dap_events_socket_t * dap_events_socket_create_type_event_unsafe(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback);
dap_events_socket_create_type_event_mt
dap_events_socket_t * dap_events_socket_create_type_event_mt(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback);
dap_events_socket_event_proc_input_unsafe
*void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t a_esocket);
dap_events_socket_create_type_pipe_unsafe
dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags);
dap_events_socket_create_type_pipe_mt
dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags);
dap_events_socket_queue_ptr_send
int dap_events_socket_queue_ptr_send(dap_events_socket_t * a_es, void a_arg);*
dap_events_socket_event_signal
int dap_events_socket_event_signal(dap_events_socket_t * a_es, uint64_t a_value);
dap_events_socket_wrap_no_add
**dap_events_socket_t *dap_events_socket_wrap_no_add(dap_events_t a_events, int a_sock, dap_events_socket_callbacks_t a_callbacks);
dap_events_socket_wrap2
**dap_events_socket_t * dap_events_socket_wrap2(dap_server_t *a_server, struct dap_events a_events, int a_sock, dap_events_socket_callbacks_t a_callbacks);
dap_events_socket_assign_on_worker_unsafe
void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struct dap_worker * a_worker)
dap_events_socket_assign_on_worker_mt
void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker)
dap_events_socket_reassign_between_workers_mt
void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new)
dap_events_socket_find_unsafe
dap_events_socket_t * dap_events_socket_find_unsafe(int sock, struct dap_events * sh);
// Find client by socket
dap_events_socket_pop_from_buf_in
*size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t sc, void * data, size_t data_size);
dap_events_socket_check_unsafe
bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es);
dap_events_socket_set_readable_unsafe
void dap_events_socket_set_readable_unsafe(dap_events_socket_t * sc,bool is_ready)
dap_events_socket_set_writable_unsafe
void dap_events_socket_set_writable_unsafe(dap_events_socket_t * sc,bool is_ready)
dap_events_socket_write_unsafe
*size_t dap_events_socket_write_unsafe(dap_events_socket_t sc, const void * data, size_t data_size);
dap_events_socket_write_f_unsafe
*size_t dap_events_socket_write_f_unsafe(dap_events_socket_t sc, const char * format,…);
dap_events_socket_remove_and_delete_unsafe
*void dap_events_socket_remove_and_delete_unsafe(dap_events_socket_t a_es, bool preserve_inheritor);
dap_events_socket_remove_from_worker_unsafe
*void dap_events_socket_remove_from_worker_unsafe(dap_events_socket_t a_es, dap_worker_t * a_worker);
dap_events_socket_set_readable_mt
void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready);
dap_events_socket_set_writable_mt
void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready);
dap_events_socket_write_mt
*size_t dap_events_socket_write_mt(dap_worker_t * a_w, dap_events_socket_t sc, const void * data, size_t data_size);
dap_events_socket_write_f_mt
*size_t dap_events_socket_write_f_mt(dap_worker_t * a_w, dap_events_socket_t sc, const char * format,…);
dap_events_socket_remove_and_delete_mt
void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w, dap_events_socket_t a_es)*
dap_events_socket_shrink_buf_in
void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size)