Нетти: channel.write зависает при отключении

У меня следующая ситуация:

Новое подключение канала открывается таким образом:

    ClientBootstrap bootstrap = new ClientBootstrap(
             new OioClientSocketChannelFactory(Executors.newCachedThreadPool()));

    icapClientChannelPipeline = new ICAPClientChannelPipeline();           
    bootstrap.setPipelineFactory(icapClientChannelPipeline);
    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
    channel = future.awaitUninterruptibly().getChannel();

Это работает, как и ожидалось.

Материал записывается на канал следующим образом:

channel.write(chunk)

Это также работает, как и ожидалось, когда соединение с сервером все еще активно. Но если сервер выходит из строя (машина отключается), вызов зависает и не возвращается.

Я подтвердил это, добавив операторы журнала до и после файла channel.write(chunk). Когда соединение разрывается, отображается только предыдущая запись журнала.

  1. Чем это вызвано? Я думал, что все эти вызовы асинхронны и немедленно возвращаются? Я также пробовал с NioClientSocketChannelFactory, такое же поведение.

  2. Я пытался использовать channel.getCloseFuture(), но слушатель никогда не вызывается, я пытался проверить канал перед записью с помощью channel.isOpen(), channel.isConnected() и channel.isWritable(), и они всегда верны...

  3. Как обойти это? Никаких исключений не выдается, и на самом деле ничего не происходит... Некоторые вопросы, такие как этот и это указывают, что невозможно обнаружить отключение канала без сердцебиения. Но я не могу реализовать сердцебиение, потому что не могу изменить серверную часть.

Среда: Netty 3, JDK 1.7


person unwichtich    schedule 30.08.2013    source источник


Ответы (1)


Хорошо, я решил это самостоятельно на прошлой неделе, поэтому я добавлю ответ для полноты.

Я был неправ в 3. потому что я думал, что мне придется изменить и клиентскую, и серверную часть для сердцебиения. Как описано в этом вопросе, вы можете использовать IdleStateAwareHandler для этой цели. Я реализовал это так:

Обработчик IdleStateAwareHandler:

public class IdleStateAwareHandler extends IdleStateAwareChannelHandler {

    @Override
    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
        if (e.getState() == IdleState.READER_IDLE) {
            e.getChannel().write("heartbeat-reader_idle");
        }
        else if (e.getState() == IdleState.WRITER_IDLE) {
            Logger.getLogger(IdleStateAwareHandler.class.getName()).log(
                    Level.WARNING, "WriteIdle detected, closing channel");
            e.getChannel().close();
            e.getChannel().write("heartbeat-writer_idle");
        }
        else if (e.getState() == IdleState.ALL_IDLE) {
            e.getChannel().write("heartbeat-all_idle");
        }
    }
}

Трубопровод:

public class ICAPClientChannelPipeline implements ICAPClientPipeline {

        ICAPClientHandler icapClientHandler;
        ChannelPipeline pipeline;

        public ICAPClientChannelPipeline(){
            icapClientHandler = new ICAPClientHandler();
        pipeline = pipeline(); 
            pipeline.addLast("idleStateHandler", new IdleStateHandler(new HashedWheelTimer(10, TimeUnit.MILLISECONDS), 5, 5, 5));
            pipeline.addLast("idleStateAwareHandler", new IdleStateAwareHandler());
            pipeline.addLast("encoder",new IcapRequestEncoder());
            pipeline.addLast("chunkSeparator",new IcapChunkSeparator(1024*4));
            pipeline.addLast("decoder",new IcapResponseDecoder());
            pipeline.addLast("chunkAggregator",new IcapChunkAggregator(1024*4));
            pipeline.addLast("handler", icapClientHandler);            
        }

        @Override
    public ChannelPipeline getPipeline() throws Exception {
            return pipeline;
    }                     
}

Это обнаруживает любое состояние ожидания чтения или записи на канале через 5 секунд. Как видите, это немного специфично для ICAP, но это не имеет значения для вопроса.

Чтобы реагировать на событие простоя, мне нужен следующий слушатель:

channel.getCloseFuture().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
            doSomething();
    }
});
person unwichtich    schedule 25.09.2013