Я хочу использовать Netflix-Ribbon в качестве балансировщика нагрузки TCP-клиента без Spring Cloud и пишу тестовый код.
public class App implements Runnable
{
public static String msg = "hello world";
public BaseLoadBalancer lb;
public RxClient<ByteBuf, ByteBuf > client;
public Server echo;
App(){
lb = new BaseLoadBalancer();
echo = new Server("localhost", 8000);
lb.setServersList(Lists.newArrayList(echo));
DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
client = RibbonTransport.newTcpClient(lb, impl);
}
public static void main( String[] args ) throws Exception
{
for( int i = 40; i > 0; i--)
{
Thread t = new Thread(new App());
t.start();
t.join();
}
System.out.println("Main thread is finished");
}
public String sendAndRecvByRibbon(final String data)
{
String response = "";
try {
response = client.connect().flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>,
Observable<ByteBuf>>() {
public Observable<ByteBuf> call(ObservableConnection<ByteBuf, ByteBuf> connection) {
connection.writeStringAndFlush(data);
return connection.getInput();
}
}).timeout(1, TimeUnit.SECONDS).retry(1).take(1)
.map(new Func1<ByteBuf, String>() {
public String call(ByteBuf ByteBuf) {
return ByteBuf.toString(Charset.defaultCharset());
}
})
.toBlocking()
.first();
}
catch (Exception e) {
System.out.println(((LoadBalancingRxClientWithPoolOptions) client).getMaxConcurrentRequests());
System.out.println(lb.getLoadBalancerStats());
}
return response;
}
public void run() {
for (int i = 0; i < 200; i++) {
sendAndRecvByRibbon(msg);
}
}
}
я обнаружил, что он будет создавать новый сокет каждый раз, когда я вызываю sendAndRecvByRibbon
, даже если для poolEnabled
установлено значение true. Итак, это меня смущает, я что-то пропустил? и нет возможности настроить размер пула, но есть PoolMaxThreads
и MaxConnectionsPerHost
.
Мой вопрос заключается в том, как использовать пул соединений в моем простом коде, и что не так с моим sendAndRecvByRibbon
, он открывает сокет, а затем использует его только один раз, как я могу повторно использовать соединение? Спасибо за ваше время.
сервер - это просто простой эхо-сервер, пишущий на pyhton3, я комментирую outconn.close()
, потому что хочу использовать длинное соединение.
import socket
import threading
import time
import socketserver
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
conn = self.request
while True:
client_data = conn.recv(1024)
if not client_data:
time.sleep(5)
conn.sendall(client_data)
# conn.close()
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == "__main__":
HOST, PORT = "localhost", 8000
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
server.serve_forever()
и pom mevan, я просто добавляю две зависимости в автоматически сгенерированный POM IED.
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon</artifactId>
<version>2.2.2</version>
</dependency>
код для печати src_port
@Sharable
public class InHandle extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().localAddress());
super.channelRead(ctx, msg);
}
}
public class Pipeline implements PipelineConfigurator<ByteBuf, ByteBuf> {
public InHandle handler;
Pipeline() {
handler = new InHandle();
}
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addFirst(handler);
}
}
и измените client = RibbonTransport.newTcpClient(lb, impl);
на Pipeline pipe = new Pipeline();client = RibbonTransport.newTcpClient(lb, pipe, impl, new DefaultLoadBalancerRetryHandler(impl));