Предварительная инициализация пула рабочих потоков для повторного использования объектов подключения (сокетов)

Мне нужно создать пул воркеров на Java, где у каждого воркера есть свой собственный подключенный сокет; когда рабочий поток запускается, он использует сокет, но оставляет его открытым для повторного использования позже. Мы выбрали этот подход, потому что накладные расходы, связанные с созданием, подключением и уничтожением сокетов на специальной основе, требовали слишком много накладных расходов, поэтому нам нужен метод, с помощью которого пул рабочих процессов предварительно инициализируется с их подключением к сокетам, готовым к взять на себя работу, сохраняя при этом ресурсы сокета в безопасности от других потоков (сокеты не являются потокобезопасными), поэтому нам нужно что-то в этом направлении ...

public class SocketTask implements Runnable {
  Socket socket;
  public SocketTask(){
    //create + connect socket here
  }

  public void run(){
    //use socket here
  }

}

При запуске приложения мы хотим инициализировать рабочие процессы и, надеюсь, соединения сокетов каким-то образом ...

MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
   pool.addWorker( new WorkerThread());

Поскольку приложение запрашивает работу, мы отправляем задачи в рабочий пул для немедленного выполнения ...

pool.queueWork( new SocketTask(..));


Обновлено с помощью рабочего кода
На основании полезных комментариев Грея и jontejj, у меня есть следующий рабочий код ...

SocketTask

public class SocketTask implements Runnable {
    private String workDetails;
    private static final ThreadLocal<Socket> threadLocal = 
           new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue(){
            return new Socket();
        }           
    };

    public SocketTask(String details){              
        this.workDetails = details;
    }

    public void run(){      
        Socket s = getSocket(); //gets from threadlocal
        //send data on socket based on workDetails, etc.
    }

    public static Socket getSocket(){
        return threadLocal.get();
    }
}

ExecutorService

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());

    int tasks = 15;  
    for( int i = 1; i <= tasks; i++){
        threadPool.execute(new SocketTask("foobar-" + i));
    }   

Мне такой подход нравится по нескольким причинам ...

  • Сокеты - это локальные объекты (через ThreadLocal), доступные для запущенных задач, что устраняет проблемы параллелизма.
  • Сокеты создаются один раз и остаются открытыми, повторно используются при постановке новых задач в очередь, что устраняет накладные расходы на создание / уничтожение объекта сокета.

person raffian    schedule 21.05.2013    source источник
comment
Спасибо за рабочий фрагмент кода, мне очень помог! Мне было интересно: как закрыть соединения с базой данных, когда пул потоков отключен?   -  person kafman    schedule 05.10.2015
comment
Найдите упорядоченное завершение потоков выполнения, надеюсь, это поможет.   -  person raffian    schedule 06.10.2015
comment
Проверил, спасибо! Я не помог мне полностью ... Я ищу способ вызвать dbconnection.close () для всех соединений db, хранящихся в ThreadLocal. Я действительно не знаю, как подойти к этому ... Я имею в виду, что соединение с базой данных не закрывается автоматически после завершения работы пула потоков, верно?   -  person kafman    schedule 06.10.2015
comment
Думаю, я нашел другое решение: я использую пул потоков и отдельный вектор того же размера для хранения соединений db. На самом деле ThreadLocal в моем случае не нужен, так как мне все равно, какой поток получает какое соединение, если он его получает. В любом случае спасибо за вдохновение!   -  person kafman    schedule 06.10.2015


Ответы (2)


Одна из идей - поместить Socket в BlockingQueue. Затем, когда вам понадобится Socket, ваши потоки могут take() выйти из очереди, а когда они завершат работу с Socket, они put() вернутся в очередь.

public void run() {
    Socket socket = socketQueue.take();
    try {
       // use the socket ...
    } finally {
       socketQueue.put(socket);
    }
}

Это дает дополнительные преимущества:

  • Вы можете вернуться к использованию кода ExecutorService.
  • Вы можете отделить обмен данными через сокет от обработки результатов.
  • Вам не нужно соответствие один-к-одному для обработки потоков и сокетов. Но связь через сокеты может составлять 98% работы, так что, возможно, никакой выгоды.
  • Когда вы закончите и ваш ExecutorService завершится, вы можете выключить свои сокеты, просто исключив их из очереди и закрыв.

Это добавляет дополнительные накладные расходы на другой BlockingQueue, но если вы делаете Socket общение, вы этого не заметите.

мы не верим, что ThreadFactory удовлетворяет наши потребности ...

Я думаю, вы могли бы это сделать, если бы использовали локальные потоки. Ваша фабрика потоков создаст поток, который сначала открывает сокет, сохраняет его в локальном потоке, а затем вызывает Runnable arg, который выполняет всю работу с сокетом, удаляя задания из внутренней очереди ExecutorService. Как только это будет сделано, метод arg.run() завершится, и вы сможете получить сокет из локального потока и закрыть его.

Примерно так. Это немного беспорядочно, но вы должны уловить идею.

ExecutorService threadPool =
    Executors.newFixedThreadPool(10,
      new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    openSocketAndStoreInThreadLocal();
                    // our tasks would then get the socket from the thread-local
                    r.run();
                    getSocketFromThreadLocalAndCloseIt();
                }
            });
            return thread;
        }
      }));

Итак, ваши задачи будут реализованы Runnable и будут выглядеть так:

public SocketWorker implements Runnable {
    private final ThreadLocal<Socket> threadLocal;
    public SocketWorker(ThreadLocal<Socket> threadLocal) {
       this.threadLocal = threadLocal;
    }
    public void run() {
        Socket socket = threadLocal.get();
        // use the socket ...
    }
}
person Gray    schedule 21.05.2013
comment
Нет. ThreadFactory вызывается только один раз для каждого потока. Итак, если ваш пул потоков имеет 10 потоков, сокет создается только при инициализации пула и запуске потоков. Не для каждой задачи @SAFX. - person Gray; 22.05.2013
comment
Помните @SAFX, что метод run () в методе newThread не является запуском вашей задачи. Это метод запуска, который ExecutorService использует для вывода задач из очереди и их запуска. - person Gray; 22.05.2013
comment
Да, отчасти @SAFX. r.run() запускается внутри параллельного кода. Он удаляет ваши задачи из внутренней очереди блокировки задач и вызывает run() вашей задачи (перехват исключений и т.п.). Я добавлю к своему ответу образец класса задачи. - person Gray; 22.05.2013
comment
1) так можно сказать executor.execute(new SocketWorker(socktThreadLocal));. Это позволяет вам определить локальный поток в классе, который запускает задачи. Или вы можете использовать static thread-local в ответе @ jontejj. 2) Ой, это была ошибка. Исправлен @SAFX. - person Gray; 22.05.2013
comment
Получил работу @Gray, без передачи threadLocal в SocketWorker, я просто определил private static final ThreadLocal<Socket> и переопределил initialValue() для создания сокета в первый раз в SocketWorker; любые задачи, отправленные исполнителю, просто повторно используют следующий доступный поток и его локальный сокет, работали как шарм, спасибо. - person raffian; 22.05.2013

Я думаю, вам следует использовать ThreadLocal

package com.stackoverflow.q16680096;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main
{
    public static void main(String[] args)
    {
        ExecutorService pool = Executors.newCachedThreadPool();
        int nrOfConcurrentUsers = 100;
        for(int i = 0; i < nrOfConcurrentUsers; i++)
        {
            pool.submit(new InitSocketTask());
        }

        // do stuff...

        pool.submit(new Task());
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class InitSocketTask implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do initial setup here
    }

}

package com.stackoverflow.q16680096;

import java.net.Socket;

public final class SocketPool
{
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue()
        {
            return new Socket(); // Pass in suitable arguments here...
        }
    };

    public static Socket get()
    {
        return SOCKETS.get();
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class Task implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do stuff with socket...
    }
}

Где каждый поток получает свой сокет.

person jontejj    schedule 21.05.2013
comment
Создание Socket ThreadLocal делает его принадлежащим потоку, а не работнику, что дает вам возможность получить доступ к сокету непосредственно внутри вашей задачи. Просто не забудьте разогреть ThreadPool с помощью InitTasks, запускающих сокеты. Затем, когда новые задачи будут выполнены, все потоки в ThreadPool будут иметь готовые сокеты. - person jontejj; 22.05.2013
comment
Я обновил свой исходный вопрос рабочим кодом из вашего примера. То, что вы называете SocketPool, я назвал SocketTask в своем коде, но у них обоих есть статический объект ThreadLocal<Socket> для получения локального объекта сокета, вам это кажется правильным? - person raffian; 22.05.2013
comment
WorkerThreadFactory кажется избыточным, что не так с Executors # defaultThreadFactory ()? - person jontejj; 23.05.2013
comment
Думаю, вы правы ... Первоначально я взял это от Грея, который предложил собственный код в run(), но я удалил этот код, так что теперь WorkerThreadFactory является общим и, вероятно, не нужен, спасибо. - person raffian; 23.05.2013
comment
+1 за безумную организацию, лол :) package com.stackoverflow.q16680096; - person Anand Rockzz; 04.11.2015