проблемы с использованием async_write сразу после async_read_until

Мой код выглядит следующим образом:

boost::asio::streambuf b1;

boost::asio::async_read_until(upstream_socket_, b1, '@',
           boost::bind(&bridge::handle_upstream_read, shared_from_this(),
           boost::asio::placeholders::error,
           boost::asio::placeholders::bytes_transferred));


void handle_upstream1_read(const boost::system::error_code& error,
                            const size_t& bytes_transferred)
  {
     if (!error)
     {

       async_write(downstream_socket_,
          b2,
          boost::bind(&bridge::handle_downstream_write,
          shared_from_this(),
          boost::asio::placeholders::error));  
     }
     else
        close();
  }

Согласно документации async_read_until, http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference/async_read_until/overload1.html, После успешного выполнения операции async_read_until streambuf может содержать дополнительные данные за пределами разделителя. Приложение обычно оставляет эти данные в streambuf для проверки последующей операцией async_read_until.

Я знаю, что streambuf может содержать дополнительные данные за пределами разделителя, но в моем случае он будет записывать эти дополнительные данные (данные за пределами char'@') в downstream_socket_ внутри операции async_write? Или будет ли функция async_write достаточно умной, чтобы не записывать эти дополнительные данные до тех пор, пока в следующий раз не будет вызвана функция handle_upstream1_read?

Согласно подходам в документации, данные в streambuf сначала сохраняются в istream ( std::istream response_stream(&streambuf); ), а затем помещаются в строку с помощью функции std::getline().

Мне действительно нужно сначала сохранить streambuf в istream, а затем преобразовать его в строку, а затем преобразовать обратно в массив символов (чтобы я мог отправить массив символов в downstream_socket_ ) вместо того, чтобы просто использовать async_write для записи данных ( до, но не включая разделитель '@' ) до downstream_socket_ ?

Я предпочитаю второй подход, так как мне не нужно делать несколько преобразований данных. Однако кажется, что что-то не так, когда я попробовал второй подход.

Мой идеальный случай таков:

  1. upstream_socket_ получил xxxx@yyyy с помощью async_read_until
  2. xxxx@ записывается в downstream_socket_
  3. upstream_socket_ получил zzzz@kkkk с помощью async_read_until
  4. yyyyzzzz@ записывается в downstream_socket_

Похоже, что операция async_write по-прежнему записывает данные за разделителем в downstream_socket_. (но я не уверен на 100% в этом)

Я ценю, если кто-нибудь может оказать небольшую помощь!


person hclee    schedule 14.02.2015    source источник


Ответы (1)


Используемая перегрузка async_write() считается завершенным, когда все streambuf данные, его входная последовательность, были записаны в WriteStream (сокет). Это эквивалентно вызову:

boost::asio::async_write(stream, streambuf,
    boost::asio::transfer_all(), handler);

Можно ограничить количество байтов, записываемых и потребляемых из объекта streambuf, вызвав этот async_write() с ссылкой boost::asio::transfer_exactly условие выполнения:

boost::asio::async_write(stream, streambuf, 
    boost::asio::transfer_exactly(n), handler);

В качестве альтернативы можно писать непосредственно из входной последовательности streambuf. Однако нужно будет явно использовать данные из streambuf.

boost::asio::async_write(stream,
    boost::asio::buffer(streambuf.data(), n), handler);
// Within the completion handler...
streambuf.consume(n);

Обратите внимание, что когда async_read_until() операция завершена, аргумент bytes_transferred обработчика завершения содержит количество байтов во входной последовательности streambuf до разделителя включительно, или 0, если произошла ошибка.


Вот полный пример, демонстрирующий использование обоих подходов. Пример написан с использованием синхронных операций в попытке упростить поток:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

/// @brief Helper function that extracts a string from a streambuf.
std::string make_string(
  boost::asio::streambuf& streambuf,
  std::size_t n)
{
  return std::string(
      boost::asio::buffers_begin(streambuf.data()),
      boost::asio::buffers_begin(streambuf.data()) + n);
}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, boost::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), boost::bind(&noop));
  io_service.run();

  // Mockup write_buffer as if it read "xxxx@yyyy" with read_until()
  // using '@' as a delimiter.
  boost::asio::streambuf write_buffer;
  std::ostream output(&write_buffer);
  output << "xxxx@yyyy";
  assert(write_buffer.size() == 9);
  auto bytes_transferred = 5;

  // Write to server.
  boost::asio::write(server_socket, write_buffer,
      boost::asio::transfer_exactly(bytes_transferred));
   // Verify write operation consumed part of the input sequence.
  assert(write_buffer.size() == 4);

  // Read from client.
  boost::asio::streambuf read_buffer;
  bytes_transferred = boost::asio::read(
      client_socket, read_buffer.prepare(bytes_transferred));
  read_buffer.commit(bytes_transferred);

  // Copy from the read buffers input sequence.
  std::cout << "Read: " << 
               make_string(read_buffer, bytes_transferred) << std::endl;
  read_buffer.consume(bytes_transferred);

  // Mockup write_buffer as if it read "zzzz@kkkk" with read_until()
  // using '@' as a delimiter.
  output << "zzzz@kkkk";
  assert(write_buffer.size() == 13); 
  bytes_transferred = 9; // yyyyzzzz@

  // Write to server.
  boost::asio::write(server_socket, buffer(write_buffer.data(),
      bytes_transferred));
  // Verify write operation did not consume the input sequence.
  assert(write_buffer.size() == 13);
  write_buffer.consume(bytes_transferred);

  // Read from client.
  bytes_transferred = boost::asio::read(
      client_socket, read_buffer.prepare(bytes_transferred));
  read_buffer.commit(bytes_transferred);

  // Copy from the read buffers input sequence.
  std::cout << "Read: " << 
               make_string(read_buffer, bytes_transferred) << std::endl;
  read_buffer.consume(bytes_transferred);
}

Выход:

Read: xxxx@
Read: yyyyzzzz@

Несколько других заметок:

  • streambuf владеет памятью, а std::istream и std::ostream используют память. Использование потоков может быть хорошей идеей, когда нужно извлечь форматированный ввод или вставить форматированный вывод. Например, когда кто-то хочет прочитать строку "123" как целое число 123.
  • Можно напрямую получить доступ к входной последовательности streambuf и выполнить итерацию по ней. В приведенном выше примере я использую boost::asio::buffers_begin() чтобы помочь создать std::string путем повторения входной последовательности streambuf.

    std::string(
        boost::asio::buffers_begin(streambuf.data()),
        boost::asio::buffers_begin(streambuf.data()) + n);
    
  • Используется поточный транспортный протокол, поэтому обрабатывайте входящие данные как поток. Имейте в виду, что даже если промежуточный сервер рефреймирует сообщения и отправляет "xxxx@" в одной операции записи и "yyyyzzzz@" в последующей операции записи, нисходящий поток может прочитать "xxxx@yyyy" в одной операции чтения.

person Tanner Sansbury    schedule 14.02.2015
comment
Спасибо за подробное и исчерпывающее объяснение, Таннер. transfer_exactly(n) и streambuf.consume(n) — это именно то, что мне нужно! Причина, по которой я застрял так долго, заключается в том, что я пытался использовать один символ «@» в качестве символа конечного тега в данных моего видеопотока, что является полностью идеей дампа. - person hclee; 18.02.2015
comment
В примере используется write(), который, как я предполагаю, является синхронным boost::asio::write(), верно? - person Dronz; 07.03.2015
comment
@Dronz Да. Поиск, зависящий от аргумента, заставил компилятор выбрать boost::asio::read(), boost::asio::write(), boost::asio::buffers_begin(), потому что один из их arguments также находился в пространстве имен boost::asio. Несмотря на это, я изменил пример, чтобы сделать его более явным. - person Tanner Sansbury; 07.03.2015
comment
Спасибо! Я заметил, что могу скомпилировать эти строки даже без использования boost::asio::ip::tcp; - person Dronz; 07.03.2015