Перемещение данных в медиастримере осуществляется с помощью очередей, описанных структурой queue_t. По очередям движутся процессии сообщений типа mblk_t, сообщения сами по себе не содержат блоков данных, а только ссылки на предыдущее, следующее сообщение и на блок данных. Кроме того, хочу особо подчеркнуть, также есть поле для ссылки на однотипное сообщение, что позволяет организовать сообщения в односвязный список. Группа сообщений, объединенных таким списком, будет называться кортежем. Таким образом, любой элемент очереди может быть отдельным сообщением mblk_t или, возможно, заголовком кортежа сообщений mblk_t. Каждое сообщение в кортеже может иметь свой собственный блок данных. Мы обсудим, зачем нужны кортежи, чуть позже.

Как упоминалось выше, само сообщение не содержит блока данных; вместо этого он содержит только указатель на область памяти, где хранится блок. В этой части общая картина работы медиастримера напоминает склад дверей в мультфильме «Корпорация монстров».

Теперь, продвигаясь вверх по иерархии, подробно рассмотрим перечисленные сущности механизма передачи данных в медиастримере.

5.1 Блок данных dblk_t

Блок данных состоит из заголовка и буфера данных. Заголовок описывается структурой dblk_t,

Листинг 5.1: Структура dblk_t

typedef struct datab
{
  unsigned char *db_base; /* Pointer to the beginning of the data buffer. */
  unsigned char *db_lim;  /* Pointer to the end of the data buffer. */
  void (*db_freefn)(void*); /* The function of freeing memory when deleting a block. */
  int db_ref; /* Reference counter. */
} dblk_t;

Поля структуры содержат указатели на начало буфера, конец буфера и функцию удаления буфера данных. Последний элемент в заголовке — это счетчик ссылокdb_ref , если он достигает нуля, это служит сигналом к ​​удалению этого блока из памяти. Если блок данных был создан функцией datab_alloc(), то буфер данных будет помещен в память сразу после заголовка. Во всех остальных случаях буфер может располагаться где-то отдельно. Буфер данных будет содержать образцы сигналов или другие данные, которые мы хотим обработать с помощью фильтров.

Новый экземпляр блока данных создается с помощью функции:

dblk_t *datab_alloc(int size);

В качестве входного параметра ему передается размер данных, которые будет хранить блок. Выделяется больше памяти для размещения заголовка — структуры datab в начале выделенной памяти. Но при использовании других функций это происходит не всегда, в некоторых случаях буфер данных может располагаться отдельно от заголовка блока данных. Во время создания поля структуры настраиваются таким образом, чтобы ее поле db_base указывало на начало области данных, а db_lim — на ее конец. Счетчик ссылок db_ref установлен равным единице. Указатель функции очистки данных установлен на ноль.

5.2 Сообщение mblk_t

Как уже упоминалось, элементы очереди имеют тип mblk_t, он определяется следующим образом:

Листинг 5.2: Структура mblk_t

typedef struct msgb
{
  struct msgb *b_prev;   // Pointer to the previous item in the list.
  struct msgb *b_next;   // Pointer to the next item in the list.
  struct msgb *b_cont;   // Pointer for “gluing” other messages to a message to create a message tuple.
  struct datab *b_datap; // Pointer to the structure of the data block.
  unsigned char *b_rptr; // Pointer to the beginning of the data area to read data from the b_datap buffer.
  unsigned char *b_wptr; // Pointer to the beginning of the data area to write data to the b_datap buffer.
  uint32_t reserved1;    // Reserved field1, media streamer puts service information there.
  uint32_t reserved2;    // Reserved field2, media streamer places service information there.
  #if defined(ORTP_TIMESTAMP)
  struct timeval timestamp;
  #endif
  ortp_recv_addr_t recv_addr;
} mblk_t;

Структура mblk_t в начале содержит указатели b_prev, b_next, необходимые для организации двусвязного списка (который является queue_t очередь).

Затем идет указатель b_contb_cont, который используется только тогда, когда сообщение входит в кортеж. Для последнего сообщения в кортеже этот указатель остается нулевым.

Далее мы видим указатель на блок данных b_datap, для которого существует сообщение. За ним следуют указатели на область внутри буфера данных блока. Поле b_rptr указывает место, из которого будут считаны данные из буфера. Поле b_wptr указывает место, из которого следует производить запись в буфер.

Остальные поля носят служебный характер и не относятся к работе механизма передачи данных.

На рисунке 5.1 показано одно сообщение с именем m1 и блок данных d1.

На картинке 5.2 изображен кортеж из трех сообщений m1, m1_1, m1_2.

5.2.1 Функции для работы
с mblk_t

Новое сообщение mblk_t создается функцией allocb():

mblk_t *allocb(int size, int pri);

размещает в памяти новое сообщение mblk_t с блоком данных заданного размера, второй аргумент — pri в этой версии не используется библиотека. На его место нужно подставить макрос BPRI_MED (после расширения макроса туда будет подставлен ноль). В процессе работы функции будет выделена память под структуру нового сообщения и будет вызвана функция mblk_init(), которая обнулит все поля созданного экземпляра структуры и затем, используя вышеприведенный datab_alloc(), создаст буфер данных. После этого поля в структуре будут настроены:

mp->b_datap = datab;
mp->b_rptr = mp->b_wptr = datab->db_base;
mp->b_next = mp->b_prev = mp->b_cont = NULL;

На выходе получаем новое сообщение с инициализированными полями и пустым буфером данных. Чтобы добавить данные в сообщение, вам нужно скопировать его в буфер блока данных:

memcpy(msg->b_rptr, data, size);

указатель источника данных данных;размерразмер данных.

Затем нужно обновить указатель на точку записи, чтобы он снова указывал на начало свободной области в буфере:

msg->b_wptr = msg->b_wptr + size

Если вы хотите создать сообщение из существующего буфера, не копируя его, используйте функцию esballoc():

mblk_t *esballoc(uint8_t *buf, int size, int pri, void (*freefn)(void*));

После создания сообщения и структуры блока данных он установит свои указатели на данные по адресу buf. Те. в этом случае буфер данных не располагается после полей заголовка блока данных, как это было при создании блока данных функцией datab_alloc(). Буфер с данными, переданными в функцию, останется там, где был, но с помощью указателей будет подстегнут к только что созданному заголовку блока данных, а тот, соответственно, к сообщению.

Несколько блоков данных могут быть объединены в одно сообщение mblk_t. Это делается с помощью функции appendb():

mblk_t * appendb(mblk_t *mp, const char *data, int size, bool_t pad);

сообщение mpa, к которому будет добавлен еще один блок данных; указатель данных на блок, копия которого будет добавлена ​​к сообщению; размер данных; padflag о том, что размер выделенной памяти должен быть выровнен по 4-байтовой границе (padding будем делать с нулями).

Если в существующем буфере данных сообщений достаточно места, новые данные будут вклеены за уже существующие данные. Если свободное место в буфере данных сообщения меньше размера, то создается новое сообщение с достаточным размером буфера, и данные копируются в его буфер. Это новое сообщение, присоединенное к исходному с помощью указателя b_cont. В этом случае сообщение становится кортежем.

Если вам нужно добавить в кортеж еще один блок данных, то вам нужно использовать функцию msgappend():

void msgappend(mblk_t *mp, const char *data, int size, bool_t pad);

он найдет последнее сообщение в кортеже (такое сообщение будет иметь нулевое значение b_cont) и вызовет функцию appendb() для этого сообщения.

Узнать размер данных в сообщении или кортеже можно с помощью функции msgdsize():

int msgdsize(const mblk_t *mp);

он будет перебирать все сообщения в кортеже и возвращать общий объем данных в буферах данных этих сообщений. Для каждого сообщения количество данных рассчитывается следующим образом:

mp->b_wptr - mp->b_rptr

Чтобы объединить два кортежа, используйте функцию concatb():

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

он объединяет кортеж newm с хвостом mp и возвращает указатель на последнее сообщение результирующего кортежа.

При необходимости кортеж можно превратить в одно сообщение с одним блоком данных, это делает функция msgpullup() :

void msgpullup(mblk_t *mp,int len);

если len равно -1, то размер выделенного буфера определяется автоматически. Если len — положительное число, то будет создан буфер такого размера и в него будут скопированы данные сообщения кортежа. Если место в буфере закончилось, копирование на этом остановится. Первое сообщение кортежа получит буфер нового размера со скопированными данными. Остальные сообщения будут удалены, а память возвращена в кучу.

При удалении структуры mblk_t учитывается счетчик ссылок блока данных, если при вызове freeb() он оказывается равным нулю, то буфер данных удален вместе с экземпляром mblk_t, указывающим на него.

Инициализация новых полей сообщений mblk_init():

void mblk_init(mblk_t *mp);

Добавляем еще один фрагмент данных в сообщение appendb():

mblk_t * appendb(mblk_t *mp, const char *data, size_t size, bool_t pad);

Если новые данные не помещаются в свободное место буфера данных сообщения, то к сообщению прикрепляется отдельно созданное сообщение с буфером необходимого размера (указатель на добавляемое сообщение устанавливается в первом сообщении), сообщение превращается в кортеж.

Добавление блока данных в кортеж msgappend():

void msgappend(mblk_t *mp, const char *data, size_t size, bool_t pad);

Функция вызывает appendb() в цикле.

Объединение двух кортежей в один concatb():

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

Сообщение newm будет добавлено к mp.

Создание копии одного сообщения copyb():

mblk_t *copyb(const mblk_t *mp);

Полная копия кортежа со всеми блоками данных copymsg():

mblk_t *copymsg(const mblk_t *mp);

Элементы кортежа копируются функцией copyb().

Создайте упрощенную копию сообщения mblk_t. При этом блок данных не копируется, а увеличивается счетчик db_ref его ссылок:

mblk_t *dupb(mblk_t *mp);

Создание облегченной копии кортежа. Блоки данных не копируются, увеличивается только их счетчик ссылок db_ref:

mblk_t *dupmsg(mblk_t* m);

Объединение всех сообщений кортежа в одно сообщение msgpullup():

void msgpullup(mblk_t *mp,size_t len);

Если len равно -1, размер выделенного буфера будет установлен автоматически.

Удалить сообщение или кортежfreemsg():

void freemsg(mblk_t *mp);

Счетчик ссылок блока данных уменьшается на единицу. Если при этом он достигает нуля, то блок данных также удаляется.

Подсчитывает общий объем данных в сообщении или кортеже.

size_t msgdsize(const mblk_t *mp);

Копирование содержимого зарезервированных полей одного сообщения в другое сообщение (на самом деле эти поля содержат флаги, которые использует медиастример):

mblk_meta_copy(const mblk_t *source, mblk *dest);

5.3 Очередь queue_t

Очередь сообщений в медиа-стримере реализована в виде циклического двусвязного списка. Каждый элемент списка содержит указатель на блок данных с отсчетами сигналов. Получается, что поочередно перемещаются только указатели на блок данных, а сами данные остаются неподвижными. Структура, описывающая очередь queue_t, показана ниже:

Листинг 5.3: Структура queue_t

typedef struct _queue
{
   mblk_t _q_stopper; /* Idle queue element, does not point to data, is used only for queue management. When initializing a queue(qinit()) its pointers are configured to point to itself. */
   int q_mcount;        /* Number of items in the queue. */
} queue_t;

Структура содержит поле — указатель _q_stopper типа *mblk_t, он указывает на первый элемент (сообщение) в очереди. Второе поле структуры — счетчик сообщений в очереди. Ниже на рисунке 5.3 очередь с именем q1, содержащая 4 сообщения m1,m2, m3, m4.

На следующем рисунке 5.4 очередь с именем q1, содержащая 4 сообщения m1, m2, m3, m4, где m2 — кортеж из двух сообщений. Сообщение m2 является заголовком кортежа, содержащего еще два сообщения m2_1 и m2_2.

5.3.1 Функции для
работы с queue_t

Инициализация очереди:

void qinit(queue_t *q);

Поле _q_stopper (далее будем называть его «стопор») инициализируется функцией mblk_init(), его указатель на предыдущий элемент и следующий элемент устанавливаются так, чтобы они указывают на себя. Счетчик элементов очереди обнуляется.

Добавление нового элемента (сообщения):

void putq(queue_t *q, mblk_t *m);

В конец списка добавляется новый элемент m, указатели элементов настраиваются таким образом, чтобы стопор становился для него следующим элементом, а он — предыдущим для стопора. Счетчик элементов очереди увеличивается.

Получение элемента из очереди:

mblk_t * getq(queue_t *q);

сообщения после извлечения стопора счетчик элемента уменьшается. Если в очереди нет элементов, кроме стопора, то возвращается 0.

Вставка сообщения в очередь:

void insq(queue_t *q, mblk_t *emp, mblk_t *mp);

Элемент mp вставляется перед элементом emp. Если emp равно 0, сообщение добавляется в конец очереди.

Извлечение сообщения из головы очереди:

void remq(queue_t *q, mblk_t *mp);

Счетчик элементов уменьшается на 1.

Чтение указателя на первый элемент в очереди:

mblk_t * peekq(queue_t *q);

Извлечение сообщения из хвоста очереди:

mblk_t *ms_queue_peek_last (queue_t *q);

Удаление всех элементов из очереди при удалении самих элементов:

void flushq(queue_t *q, int how);

Аргумент как не используется. Счетчик элементов очереди обнуляется.

Макрос для чтения указателя на последний элемент очереди:

mblk_t * qlast(queue_t *q);

При работе с очередями сообщений помните, что при вызове ms_queue_put (q, m) с нулевым указателем на сообщение функция зациклится. Ваша программа зависнет. Функция ms_queue_next (q, m) ведет себя аналогично.

5.3.2 Подключение фильтров

Описанные выше очереди используются для передачи сообщений от одного фильтра к другому или от одного к нескольким фильтрам. Фильтры и их соединения друг с другом образуют ориентированный граф. Вход или выход фильтра будем называть обобщающим словом «вывод». Для описания порядка соединений между фильтрами медиастример использует понятие «сигнальная точка». Сигнальная точка представляет собой структуру MSCPoint , которая содержит указатель на фильтр и номер одного из его выводов, соответственно она описывает подключение одного из входов или выходов фильтра.

Сигнальная точка
графика обработки данных

Листинг 5.4: Структура MSCPoint

typedef struct _MSCPoint
{
  struct _MSFilter *filter; /* Pointer to the media streamer filter. */
  int pin;                 /* The number of one of the filter inputs or outputs, i.e. pin. */
} MSCPoint;

Штыри фильтра нумеруются, начиная с нуля. Соединение двух выводов с очередью сообщений описывается структурой MSQueue, которая содержит очередь сообщений и указатели на две сигнальные точки, которые она соединяет:

typedef struct _MSQueue
{
  queue_t q;
  MSCPoint prev;
  MSCPoint next;
}MSQueue;

Мы назовем эту структуру связь сигнала. Каждый фильтр медиастримера содержит таблицу входных и таблицу выходных ссылок сигналов (MSQueue). Размер таблиц задается при создании фильтра, мы уже делали это с помощью экспортируемой переменной типа MSFilterDesc при разработке собственного фильтра в главе 4. Ниже, в листинге 5.5, показана структура, описывающая любой фильтр в медиастримере, MSFilter:

Листинг 5.5: Структура MSFilter

struct _MSFilter{
    MSFilterDesc *desc;    /* Pointer to filter descriptor. */
    /* Protected attributes, they cannot be shifted or removed, otherwise the work with plugins will be broken. */
    ms_mutex_t lock;      /* Mutex. */
    MSQueue **inputs;     /* Input links table. */
    MSQueue **outputs;    /* Output links table. */
    struct _MSFactory *factory; /* Pointer to the factory that created this filter instance. */
    void *padding;              /* Not used, will be used if protected fields are added. */
    void *data;                 /* Pointer to an arbitrary structure for storing data for the internal state of the filter and intermediate calculations. */
    struct _MSTicker *ticker;   /* A pointer to the ticker object, which must not be null when the function is called process(). */
    /*private attributes, they can be moved and changed at any time*/
    MSList *notify_callbacks; /* List of callbacks used to handle filter events. */
    uint32_t last_tick;       /* Last measure number when the call   of process() was made. */ 
    MSFilterStats *stats;     /* Filter statistics.*/
    int postponed_task; /* Number of pending tasks. Some filters may delay data processing (call process()) for several measures.*/
    bool_t seen;  /* The flag used by the ticker to indicate that it has already served this filter instance at this clock cycle.*/
};
typedef struct _MSFilter MSFilter;

После того, как мы подключили фильтры в программе на языке C в соответствии с нашим замыслом (но не подключили тикер), мы тем самым создали ориентированный граф, узлы которого являются экземплярами структуры MSFilter, а ребра являются экземплярами сигнальных ссылок MSQueue.

5.4. Закулисная
активность бегущей строки

Когда я сказал вам, что тикер — это фильтр источника часов, это не было всей правдой. Тикер — это объект, который ежечасно выполняет функции process() всех фильтров, к которым он прямо или косвенно подключен. Когда мы подключаем тикер к графическому фильтру в программе на C, мы показываем тикеру график, которым он будет управлять с этого момента, пока мы его не отключим. После подключения тикер начинает изучать доверенный ему график, составляя список фильтров, которые он включает. Чтобы не «считать» один и тот же фильтр дважды, он помечает обнаруженные фильтры, устанавливая флаг увидел. Поиск осуществляется по таблицам сигнальных звеньев, которые есть у каждого фильтра.

Во время ознакомительного тура по графику тикер проверяет, есть ли среди фильтров хотя бы один фильтр, выступающий в роли источника блоков данных. Если таковых не найдено, то график признается неверным и тикер вылетает из программы.

Если граф оказался «правильным», для каждого найденного фильтра вызывается функция preprocess() для его инициализации. Как только наступает момент следующего цикла обработки (по умолчанию каждые 10 миллисекунд), тикер вызывает функцию process() для всех ранее найденных исходных фильтров, а затем и для остального списка фильтры. Если у фильтра есть входные ссылки, то функция process() повторяется до тех пор, пока очереди входных ссылок не опустеют. После этого бегунок переходит к следующему фильтру в списке и «прокручивает» его до тех пор, пока входные ссылки не освободятся от сообщений. Бегущая строка перемещается от фильтра к фильтру, пока список не будет прокручен. На этом обработка меры завершена.

Теперь вернемся к кортежам и поговорим о том, зачем такая сущность была добавлена ​​в медиастример. В общем случае объем данных, необходимых алгоритму, работающему внутри фильтра, не совпадает и не кратен размеру входных буферов данных. Например, мы пишем фильтр, выполняющий быстрое преобразование Фурье, которое по определению может обрабатывать только блоки данных размером, равным степени двойки. Пусть это будет 512 отсчетов. Если данные формируются по телефонному каналу, то буфер данных каждого сообщения на входе принесет нам 160 отсчетов сигнала. Есть соблазн не брать данные со входа, пока в сигнальном звене не будет нужного количества данных. Но в этом случае произойдет коллизия с тикером, который будет безуспешно пытаться прокрутить фильтр до тех пор, пока входная ссылка не станет пустой. Ранее мы назвали это правило третьим принципом фильтра. В соответствии с этим принципом функция process() фильтра должна забирать все данные из входных очередей.

Кроме того, от входа нельзя будет забрать только 512 сэмплов, так как можно будет забрать только целые блоки, т.е. фильтр должен будет взять 640 сэмплов и использовать 512 из них, а остаток хранить до новой порции данные накапливаются. Таким образом, наш фильтр, помимо своей основной работы, должен обеспечивать вспомогательные действия по промежуточному хранению входных данных. Разработчики медиастримера для решения этой общей проблемы разработали специальный объект — MSBufferizer (буферизатор), который решает эту задачу с помощью кортежей.

5.5 Буферизация объекта
MSBufferizer

Это объект, который может накапливать входные данные внутри фильтра и начинать отдавать их на обработку, как только количество информации становится достаточным для включения алгоритма фильтра. Пока буфер накапливает данные, фильтр будет работать в режиме ожидания, не потребляя вычислительную мощность процессора. Но как только функция чтения из буфера возвращает ненулевое значение, функция process() фильтра начинает брать данные из буфера и обрабатывать данные порциями нужного размера, пока они истощены. Невостребованные данные остаются в буфере в качестве первого элемента кортежа, к которому присоединяются последующие блоки новых входных данных.

Структура MSBufferizer, описывающая буферизацию, показана в листинге 5.6.

Листинг 5.6: Структура MSBufferizer

struct _MSBufferizer
{
  queue_t q; /* Message queue. */
  int size; /* The total size of the data currently in the buffer. */
};
typedef struct _MSBufferizer MSBufferizer;

5.5.1 Функции для работы
с MSBufferizer

Создание нового экземпляра буферизатора:

MSBufferizer * ms_bufferizer_new(void);

Выделяется память, инициализируется в ms_bufferizer_init() и возвращается указатель.

Функция инициализации:

void ms_bufferizer_init(MSBufferizer *obj);

Очередь q инициализируется, поле size устанавливается равным нулю.

Добавление сообщения:

void ms_bufferizer_put(MSBufferizer *obj, mblk_t *m);

Сообщение m добавляется в очередь. Вычисленный размер блоков данных добавляется к size.

Буферизация всех сообщений в очереди данных ссылки q:

void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q);

Перенос сообщений из ссылки q в буфер выполняется с помощью функции ms_bufferizer_put().

Чтение из буфера:

int ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, int datalen);

Если размер накопленных в буферизаторе данных оказывается меньше запрошенного (datalen), функция возвращает ноль, и данные не копируются в data . В противном случае данные последовательно копируются из кортежей в буфер. После копирования кортеж удаляется, а память освобождается. Копирование заканчивается в тот момент, когда скопировано datalen байт. Если пробел в data заканчивается среди исходного блока данных, то в этом сообщении он будет сокращен до оставшейся, еще не скопированной части. При следующем звонке копирование продолжится с этой точки.

Чтение объема данных, которые в настоящее время доступны в буферизаторе:

int ms_bufferizer_get_avail(MSBufferizer *obj);

Возвращает поле size буферизатора.

Отбрасывание некоторых данных в буферизаторе:

void ms_bufferizer_skip_bytes(MSBufferizer *obj, int bytes);

Указанное количество байтов данных извлекается и отбрасывается. Самые старые данные отбрасываются.

Удаление всех сообщений в буферизаторе:

void ms_bufferizer_flush(MSBufferizer *obj);

Счетчик данных обнуляется.

Удаление всех сообщений в буферизаторе:

void ms_bufferizer_uninit(MSBufferizer *obj);

Счетчик не сбрасывается.

Удаление буферизатора и освобождение памяти:

void ms_bufferizer_destroy(MSBufferizer *obj);

Примеры использования буферизации можно найти в исходном коде нескольких фильтров медиастримеров. Например, в фильтре MS_L16_ENC, который меняет местами байты в выборках из сетевого порядка в хостовый: l16.c

В следующей главе мы рассмотрим фильтры отладки.

P.S.

Если вам неинтересно, как легко рисовать причудливые картинки вроде 5.1–5.4, то можете прочитать мою статью: Синергия Graphviz и препроцессора C/C++