Однопоточная обработка задачи без постановки в очередь дальнейших запросов

У меня есть требование, чтобы задача выполнялась асинхронно, отбрасывая любые дальнейшие запросы, пока задача не будет завершена.

Синхронизация метода просто ставит задачи в очередь и не пропускает их. Сначала я думал использовать SingleThreadExecutor, но это также ставит задачи в очередь. Затем я посмотрел на ThreadPoolExecutor, но он считывает очередь, чтобы выполнить задачу, и поэтому будет выполняться одна задача и как минимум одна задача в очереди (остальные можно отбросить с помощью ThreadPoolExecutor.DiscardPolicy).

Единственное, что я могу придумать, это использовать семафор для блокировки очереди. Я пришел со следующим примером, чтобы показать, чего я пытаюсь достичь. Есть ли более простой способ? Я пропустил что-то очевидное?

import java.util.concurrent.*;

public class ThreadPoolTester {
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private static Semaphore processEntry = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        if (!processEntry.tryAcquire()) return;
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        try {
                            System.out.println("start " + index);
                            Thread.sleep(1000); // pretend to do work
                            System.out.println("stop " + index);
                            return null;

                        } finally {
                            processEntry.release();
                        }
                    }
                }
            );
    }
}

Пример вывода

start 0
stop 0
start 5
stop 5
start 10
stop 10
start 15
stop 15

Взяв ответ axtavt и преобразовав приведенный выше пример, вы получите следующее более простое решение.

import java.util.concurrent.*;

public class SyncQueueTester {
    private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 
            1000, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        System.out.println("start " + index);
                        Thread.sleep(1000); // pretend to do work
                        System.out.println("stop " + index);
                        return null;
                    }
                }
            );
    }
}

person Michael Rutherfurd    schedule 10.02.2011    source источник


Ответы (2)


Похоже, исполнитель, поддерживаемый SynchronousQueue с желаемой политикой, делает то, что вы хотите:

executor = new ThreadPoolExecutor(
    1, 1, 
    1000, TimeUnit.SECONDS, 
    new SynchronousQueue<Runnable>(),
    new ThreadPoolExecutor.DiscardPolicy());
person axtavt    schedule 10.02.2011
comment
Не могли бы вы сказать мне, почему вы предпочитаете использовать DiscardPolicy вместо перехвата исключения RejectedExecutionException? - person Clayton Louden; 16.10.2012

если нет очереди, я бы сказал, что нет необходимости в исполнителе. использование одного только семафора кажется достаточным. я использую приведенный ниже код, чтобы избежать запуска одного и того же кода, когда он уже запущен. просто убедитесь, что semaphore равно static volatile, что делает семафор единственным семафором для класса и распространяет ссылку семафора на кучу других потоков, как только она изменяется

if (this.getSemaphore().tryAcquire()) {
        try {
            process();
        } catch (Exception e) {
        } finally {
            this.getSemaphore().release();
        }
}
else {
    logger.info(">>>>> Job already running, skipping go");
}
person ajitatif    schedule 10.02.2011
comment
Задача должна выполняться в отдельном потоке, чтобы основной поток не блокировался или не подвергался иному воздействию (основной поток на самом деле является приложением Swing). - person Michael Rutherfurd; 10.02.2011