RxNetty повторно использует соединение

Я хочу использовать Netflix-Ribbon в качестве балансировщика нагрузки TCP-клиента без Spring Cloud и пишу тестовый код.

public class App  implements Runnable
{
    public static String msg = "hello world";
    public BaseLoadBalancer lb;
    public RxClient<ByteBuf, ByteBuf >  client;
    public Server echo;

    App(){
        lb = new BaseLoadBalancer();
        echo = new Server("localhost", 8000);
        lb.setServersList(Lists.newArrayList(echo));
        DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
        client = RibbonTransport.newTcpClient(lb, impl);
    }
    public static void main( String[] args ) throws Exception
    {
        for( int i = 40; i > 0; i--)
        {
            Thread t = new Thread(new App());
            t.start();
            t.join();
        }
        System.out.println("Main thread is finished");
    }
    public String sendAndRecvByRibbon(final String data) 
    {
        String response = "";
        try {
            response = client.connect().flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>,
                    Observable<ByteBuf>>() {
                public Observable<ByteBuf> call(ObservableConnection<ByteBuf, ByteBuf> connection) {

                    connection.writeStringAndFlush(data);
                    return connection.getInput();
                }
            }).timeout(1, TimeUnit.SECONDS).retry(1).take(1)
                    .map(new Func1<ByteBuf, String>() {
                        public String call(ByteBuf ByteBuf) {
                            return ByteBuf.toString(Charset.defaultCharset());
                        }
                    })
                    .toBlocking()
                    .first();
        }
        catch (Exception e) {
            System.out.println(((LoadBalancingRxClientWithPoolOptions) client).getMaxConcurrentRequests());
            System.out.println(lb.getLoadBalancerStats());
        }
        return response;
    }
    public void run() {
        for (int i = 0; i < 200; i++) {
            sendAndRecvByRibbon(msg);
        }
    }

}

я обнаружил, что он будет создавать новый сокет каждый раз, когда я вызываю sendAndRecvByRibbon, даже если для poolEnabled установлено значение true. Итак, это меня смущает, я что-то пропустил? и нет возможности настроить размер пула, но есть PoolMaxThreads и MaxConnectionsPerHost.

Мой вопрос заключается в том, как использовать пул соединений в моем простом коде, и что не так с моим sendAndRecvByRibbon, он открывает сокет, а затем использует его только один раз, как я могу повторно использовать соединение? Спасибо за ваше время.

сервер - это просто простой эхо-сервер, пишущий на pyhton3, я комментирую outconn.close(), потому что хочу использовать длинное соединение.

import socket
import threading
import time
import socketserver
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request
        while True:
            client_data = conn.recv(1024)
            if not client_data:
                time.sleep(5)
            conn.sendall(client_data)
#            conn.close()

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

if __name__ == "__main__":
    HOST, PORT = "localhost", 8000
    server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
    ip, port = server.server_address
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.daemon = True
    server_thread.start()
    server.serve_forever()

и pom mevan, я просто добавляю две зависимости в автоматически сгенерированный POM IED.

<dependency>
    <groupId>commons-configuration</groupId>
    <artifactId>commons-configuration</artifactId>
    <version>1.6</version>
</dependency>
<dependency>
    <groupId>com.netflix.ribbon</groupId>
    <artifactId>ribbon</artifactId>
    <version>2.2.2</version>
</dependency>

код для печати src_port

@Sharable
public class InHandle extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().localAddress());
        super.channelRead(ctx, msg);
    }
}

public class Pipeline implements PipelineConfigurator<ByteBuf, ByteBuf> {
    public InHandle handler;
    Pipeline() {
        handler = new InHandle();
    }
    public void configureNewPipeline(ChannelPipeline pipeline) {
        pipeline.addFirst(handler);
    }
}

и измените client = RibbonTransport.newTcpClient(lb, impl);на Pipeline pipe = new Pipeline();client = RibbonTransport.newTcpClient(lb, pipe, impl, new DefaultLoadBalancerRetryHandler(impl));


person obgnaw    schedule 15.08.2018    source источник


Ответы (1)


Итак, ваш конструктор App() выполняет инициализацию lb/client/etc.

Затем вы запускаете 40 разных потоков с 40 разными экземплярами RxClient (каждый экземпляр по умолчанию имеет собственный пул), вызывая new App() в первом цикле for. Чтобы было ясно, то, как вы создаете здесь несколько экземпляров RxClient, не позволяет им совместно использовать какой-либо общий пул. Вместо этого попробуйте использовать один экземпляр RxClient.

Что, если вы измените свой основной метод, как показано ниже, перестанет ли он создавать дополнительные сокеты?

    public static void main( String[] args ) throws Exception
    {
        App app = new App() // Create things just once
        for( int i = 40; i > 0; i--)
        {
            Thread t = new Thread(()->app.run()); // pass the run()
            t.start();
            t.join();
        }
        System.out.println("Main thread is finished");
    }

Если вышеизложенное не помогает в полной мере (по крайней мере, уменьшит количество созданных сокетов в 40 раз) - не могли бы вы уточнить, как именно вы определяете, что:

я обнаружил, что он будет создавать новый сокет каждый раз, когда я вызываю sendAndRecvByRibbon

и каковы ваши измерения после обновления конструктора этой строкой:

    DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
    impl.set(CommonClientConfigKey.PoolMaxThreads,1); //Add this one and test

Обновить

Да, глядя на sendAndRecvByRibbon, кажется, что ему не хватает маркировки PooledConnection как больше не получается при вызове close, когда вы не ожидаете от него дальнейшего чтения.

Пока вы ожидаете единственное событие чтения, просто измените эту строку

connection.getInput()

к

return connection.getInput().zipWith(Observable.just(connection), new Func2<ByteBuf, ObservableConnection<ByteBuf, ByteBuf>, ByteBuf>() {
                        @Override
                        public ByteBuf call(ByteBuf byteBuf, ObservableConnection<ByteBuf, ByteBuf> conn) {
                            conn.close();
                            return byteBuf; 
                        }
                    });

Обратите внимание, что если вы разработаете более сложный протокол через TCP, то входной bytebuf можно проанализировать на наличие вашего конкретного знака «конец связи», который указывает, что соединение может быть возвращено в пул.

person Kostiantyn    schedule 21.08.2018
comment
так же, как и раньше, для обоихnew Thread(()->app.run())и установите PoolMaxThreadsна 1 или 2000. По вашему вопросу вы можете распечатать порт на сервере или клиенте, чтобы найти его, сокет определяется (src_ip, src_port, dest_ip, dest_port). - person obgnaw; 22.08.2018
comment
@obgnaw, не могли бы вы описать точный метод, который вы используете для печати порта src? И обновить свой вопрос с помощью этого фрагмента кода? - person Kostiantyn; 22.08.2018
comment
добавьте его, и я считаю, что пример runalbe. - person obgnaw; 22.08.2018