Повторяемый пул потоков путем переопределения afterExecute(Runnable r, Throwable t)

Я хочу реализовать пул потоков, чтобы задачи могли выполняться в определенное время, переопределяя хук afterExecute. Могу ли я снова отправить аргумент Runnable r?

Вот моя первоначальная реализация.

public class RetriableThreadPool extends ThreadPoolExecutor {

  static final int MAXRETRYTIMES = 5;

  int retryTimes = 0;

  public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    retryTimes = 0;
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (retryTimes < MAXRETRYTIMES) {
      retryTimes++;
      super.submit(r);
    }
  }

}

В этой первоначальной реализации я просто разрешаю отправить одну задачу.

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

  public static void main(String[] args) {
    RetriableThreadPool retriableThreadPool = new RetriableThreadPool(10, 10, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    retriableThreadPool.execute(new Runnable() {
      int num = 0;

      @Override
      public void run() {
        // TODO Auto-generated method stub
        num = num + 123;
        System.out.println(num);
      }

    });
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    // retriableThreadPool.shutdown();
  }
}

В этом примере я получил странный вывод:

123
246

Если runnable может быть повторно отправлен, я думаю, что должен получить 5 выходов. Если это не может быть повторно представлено. Только 123 должно быть результатом. Я не понимаю причину этого вывода.


Я изменил код благодаря nogard

public class RetriableThreadPool extends ThreadPoolExecutor {

  static final int MAXRETRYTIMES = 5;

  int retryTimes = 0;

  public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    retryTimes = 0;
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (retryTimes < MAXRETRYTIMES) {
      retryTimes++;
      super.execute(r);
    }
  }
}

У меня есть еще 3 вопроса:

  1. Как повторить запуск с исходным состоянием. В этом случае я ожидал, что результаты будут в 5 раз больше 123.
  2. Как добавить хуки для метода submit так же, как afterExecute для execute
  3. Есть ли уже хорошая реализация retriable threadpool? Я хочу, чтобы runnable повторялся, когда выбрасываются исключения или callable возвращает определенные результаты.

person chenatu    schedule 13.01.2016    source источник


Ответы (1)


Я думаю, что причина такого поведения в том, что вы отправляете задачу в метод afterExecute, а не выполняете, и отправка больше не вызовет afterExecute обратного вызова. Вот почему вы видите только 2 строки в выводе: первая из исходного выполнения, а вторая из отправки.

Более того, вы никогда не увеличиваете счетчик повторных попыток, ваша задача всегда будет повторно отправлена

@Override
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    ++ retryTimes;
    if (retryTimes < MAXRETRYTIMES) {
        super.execute(r);
    }
}

Обновление для ваших 3 вопросов:

  1. Есть несколько вариантов:

    • don't change the state inside Runnable (don't assign to num)
    • создать новый экземпляр Runnable (или скопировать экземпляр)
    • сбросить состояние Runnable
  2. Для хука я бы реализовал шаблон Decorator: что-то вроде этого:

    public class YourExecutor {
    @Override
    public void submit(Runnable task) {
        return super.submit(new TaskDecorator(task));
    }
    
    protected void onCompletedTask(Runnable task) {
        // callback
    }
    
    private class TaskDecorator implements Runnable {
        private final Runnable delegate;
    
        public TaskDecorator(Runnable delegate) {
            this.delegate = delegate;
        }
    
        @Override
        public void run() {
            this.delegate.run();
            onCompletedTask(delegate);
        }
    }
    
person nogard    schedule 13.01.2016
comment
Спасибо, я отредактировал код, чтобы прояснить проблему. У меня есть 2 вопроса. как повторить запуск с начальным состоянием. В этом случае я ожидал, что вывод будет 5 раз по 123. Другой вопрос заключается в том, как запускать хуки, такие как afterExecute, при использовании submit() - person chenatu; 13.01.2016