в netty мы можем только записывать и получать данные меньше 1024 байт: как мы можем писать или получать больше?

При записи 2048 байтов в обработчик метод messageRevieved должен вызываться дважды, чтобы получить все данные... как я могу получить данные 2048 байт в

Код

Сервер:

public class Server{
    public static void main(String[] args){
        ChannelFactory factory=new NioServerSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool());
        ServerBootstrap bootstrap=new ServerBootstrap(factory);
        bootstrap.setPipelineFactory(new CarPipelineFactory());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.bind(new InetSocketAddress(8989));
    }
}

Обработчик сервера:

public class ServerHandler extends SimpleChannelHandler{

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e){
        byte[] resp=data.getBytes();//data is a String greater than 1024bytes;
        ChannelBuffer buffer=ChannelBuffers.buffer(resp.length);
        buffer.writerBytes(resp);
        e.getChannel().write(buffer);
        buffer.clear();
    }
}

Клиент:

public class Client{
    public static void main(String[] args){
        ChannelFactory channelFactory=new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool());
        ClientBootstrap bootstrap=new ClientBootstrap(channelFactory);
        bootstrap.getPipeline().addLast("handler", new PhoneClientHandler());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.connect(new InetSocketAddress("127.0.0.1",8181));
    }
}

Клиентский обработчик:

public class ClientHandler extends SimpleChannelHandler{
    public void messageRecieved(ChannelHandlerContext ctx, ChannelStateEvent e){
        ChannelBuffer buffer=(ChannelBuffer)e.getMessage();
        int size=buffer.readableBytes();
        byte[] bytes=new byte[size];
        buffer.readBytes(bytes);
        buffer.clear();
        System.out.println(new String(bytes));//if the data size>1024,the String will speprate into parts.
    }
}

person Gofier    schedule 16.04.2012    source источник
comment
Извините, я не понимаю вопроса. Можете ли вы попытаться быть более конкретным?   -  person Norman Maurer    schedule 16.04.2012
comment
извините, я китаец, и мой английский не очень хорош. ниже мой вопрос: поскольку я использую netty, когда я пишу 2048 байтов (больше 1024 байтов) данных из одной руки в другую, принимающая рука должна получать дважды, как я могу просмотреть все данные (больше 1024 байт) за один раз ?   -  person Gofier    schedule 17.04.2012


Ответы (6)


Ну, вы всегда можете решить, сколько байтов записывать за раз, но вы точно никогда не знаете, когда и сколько байтов будет получено (вот почему NIO имеет смысл). Вам нужно обработать свой собственный буфер для получения фиксированного количества байтов, которое вы хотите. Для этого вы можете использовать FrameDecoder, который предназначен для этой цели.

Кроме того, вы можете убедиться, что данные не остаются слишком долго в буфере сокета отправителя, установив для параметра tcpNoDelay значение true, поэтому он не будет ждать, пока текущий «кадр» достигнет определенного критического размера, прежде чем физически отправка данных.

Если я правильно понимаю, вы пишете, скажем, 2048 байт в одной руке, но не все данные получены в событии messagedReceived с другой стороны? Попробуйте проверить следующие распространенные проблемы:

  • ваше приложение завершается слишком рано, а данные еще не получены
  • ваши данные застряли в буфере сокета «отправителя», потому что вы не закрыли канал и для параметра tcpNoDelay не было установлено значение true. Это заставляет сокет ждать некоторых дополнительных байтов перед отправкой пакета.
  • вы не прочитали все данные внутри ChannelBuffer, но по какой-то причине readerIndex был установлен в другую позицию

Попробуйте показать нам часть вашего кода, это должно упростить задачу...

ДОБАВЛЕНО 17.04.2012

Насколько я понимаю, вы пытаетесь передать кодировку массива байтов для строки от отправителя к получателю. Вот ваш код после небольшого рефакторинга:

----------------------------код --------------------- ------- напишите от руки: response.size()>1024bytes

byte[] datas = ((String)msg).getBytes("UTF-8"); //ALWAYS SPECIFY THE ENCODING
ChannelBuffer buffer = ChannelBuffers.wrap(datas); //USE DIRECTLY THE ARRAY
System.out.println(buffer);    //buffer'size>1024 here
channel.write(buffer);

----------------------------получить руку: должен получить дважды, println() будет выполняться дважды

ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); 
System.out.println(buffer)    //buffer'size once 1024,once the remainder size
byte[] datas =buffer.readBytes(buffer.readableBytes()).array()
String msg=new String(datas , "UTF-8"); //BAD IDEA because the bytes sequence of the last UTF-8 char could be uncompleted there
System.out.println(str);

Это не тот способ, вместо этого вы должны использовать непосредственно StringEncoder и StringDecoder в пакете org.jboss.netty.handler.codec.string< /сильный>. Он справится с проблемой кадрирования для вас. Если вы все еще хотите отладить свой код, используйте LoggingHandler, предоставленный Netty. Также вы действительно установили эту опцию:

bootstrap.setOption("tcpNoDelay", true);

в обе стороны бутстрапы?

person Renaud    schedule 16.04.2012
comment
прежде всего, спасибо за ответ на мой вопрос, мне очень жаль, что мой английский не очень хорош. да, как вы сказали, когда я пишу 2048 байт в обработчике, метод messageRevieved должен вызываться дважды, чтобы получить все данные. .. Я покажу свой код прямо сейчас, еще раз спасибо. - person Gofier; 17.04.2012
comment
Спасибо большое! NumRenaud, я изменил свой код, как вы предложили, но проблема осталась, я уверен, что установил опцию bootstrap.setOption(tcpNoDelay,true) с обеих сторон, я также попробовал child.tcpNoDelay и использую SimpleChannelHandler с обеих сторон . - person Gofier; 17.04.2012
comment
bootstrap.setOption(tcpNoDelay,true) на клиенте и bootstrap.setOption(child.tcpNoDelay,true) на сервере - person Renaud; 17.04.2012
comment
Не могу помочь вам больше с этой небольшой информацией, которую вы даете... вам придется поместить весь код для вашего обработчика и вашего BootStrap, чтобы воспроизвести проблему! - person Renaud; 17.04.2012
comment
спасибо, что обратили внимание на мой вопрос, я прикрепил весь свой код выше, ожидайте вашего ответа, большое спасибо! - person Gofier; 18.04.2012

Попробуйте использовать TruncatedChannelBuffer или BigEndianHeapChannelBuffer вместо буфера каналов в ClientHandler. Я думаю, что это сработает ... или, если это не сработает, пожалуйста, опубликуйте трассировку стека сгенерированного исключения. Я попробовал это в своем коде, и это сработало. Надеюсь, это поможет вам.

public void messageReceived(ChannelHandlerContext channelHandlerContext,MessageEvent messageEvent) throws Exception {

    Object messageObject = messageEvent.getMessage();

    // if size of message < 1024 then TruncatedChannelBuffer is returned.

    if (messageObject instanceof TruncatedChannelBuffer) {

        try {

            TruncatedChannelBuffer truncatedChannelBuffer = (TruncatedChannelBuffer) messageObject;

            byte[] byteArray = new byte[truncatedChannelBuffer.readableBytes()];

            truncatedChannelBuffer.readBytes(byteArray);

            System.out.print(" Message = "+new String(byteArray));

            truncatedChannelBuffer.clear();

        } catch (Exception e) {

            System.out.println("Exception in MessageReceived...");

            e.printStackTrace();


        }
    }
    // if size of message > 1024 then BigEndianHeapChannelBuffer is returned.

    if (messageObject instanceof BigEndianHeapChannelBuffer) {

        try {

            BigEndianHeapChannelBuffer bigEndianHeapChannelBuffer = (BigEndianHeapChannelBuffer) messageObject;

            byte[] byteArray  = new byte[bigEndianHeapChannelBuffer.readableBytes()];

            bigEndianHeapChannelBuffer.readBytes(byteArray);

            System.out.print(" Message = "+new String(byteArray));

            bigEndianHeapChannelBuffer.clear();


        } catch (Exception e) {

            System.out.println("Exception in MessageReceived...");

            e.printStackTrace();

        }
    }

}       
person prashant khunt    schedule 07.07.2012

Прежде всего, для клиента параметр начальной загрузки не должен начинаться с «дочернего»:

bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);

Также вы не используете один и тот же порт на клиенте и сервере!!

Во-вторых, у вас нет «закрытой» стратегии: когда ваш клиент должен знать, что его работа выполнена? Как предотвратить преждевременное завершение темы? Вы должны сделать это

ОБРАБОТЧИК СЕРВЕРА

public class ServerHandler extends SimpleChannelHandler{

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e){
        byte[] resp=data.getBytes();//data is a String greater than 1024bytes;
        ChannelBuffer buffer=ChannelBuffers.buffer(resp.length);
        buffer.writerBytes(resp);
        e.getChannel().write(buffer);
        buffer.clear();
        e.getChannel.close();
    }
}

ЗАГРУЗКА КЛИЕНТА

public class Client{
    public static void main(String[] args){
        ChannelFactory channelFactory=new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool());
        ClientBootstrap bootstrap=new ClientBootstrap(channelFactory);
        bootstrap.getPipeline().addLast("handler", new PhoneClientHandler());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1",8181));

        // Wait until the connection is closed or the connection attempt fails.
        future.getChannel().getCloseFuture().awaitUninterruptibly();

        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();
    }
}

Наконец, вам нужно лучше понять, что вы делаете, прочитав множество примеров. Их можно найти внутри пакета org.jboss.netty.example в основном пакете загрузки.

person Renaud    schedule 18.04.2012

У меня была такая же проблема, попробуйте использовать Oio, а не Nio! (просто измените «nio» на «oio» и «Nio» на «Oio».

http://lists.jboss.org/pipermail/netty-users/2009-June/000891.html

person Franz Bettag    schedule 15.01.2013

RenaudBlue@ делает хорошие выводы. Кроме того, я предлагаю переключиться на Netty4, который делает все ByteBuf динамическими и упрощает управление чтением/записью по частям. См. "Перенос клиента".

Например,

private void sendNumbers() {
  // Do not send more than 4096 numbers.
  boolean finished = false;
  MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
  while (out.size() < 4096) {
      if (i <= count) {
          out.add(Integer.valueOf(i));
          i ++;
      } else {
          finished = true;
          break;
      }
  }

  ChannelFuture f = ctx.flush();
  if (!finished) {
      f.addListener(numberSender);
  }
}

private final ChannelFutureListener numberSender = new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
      if (future.isSuccess()) {
          sendNumbers();
      }
  }
};

Netty4 также имеет безопасность типов для конфигурации параметров канала, что предотвратило бы ошибку "child.tcpNoDelay".

Но большим преимуществом Netty4 является четко определенная модель потоков, которая делает Netty намного проще в использовании.

person cdunn2001    schedule 31.03.2013

Вам необходимо настроить FixedRecvByteBufAllocator из SocketChannel в childHandler() следующим образом:

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2 * 1024));
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 5));
                pipeline.addLast(new StringEncoder());
                pipeline.addLast(new StringDecoder());
                ...
            }
        });
person thangdc94    schedule 04.01.2018