Как добавить пакетную обработку, неявную для клиента?

Рассмотрим следующий код:

Код клиента:

public class MyClient {
    private final MyClientSideService myClientSideService;

    public MyClient(MyClientSideService myClientSideService) {
        this.myClientSideService = myClientSideService;
    }

    public String requestRow(Integer req) {
        return myClientSideService.requestSingleRow(req);
    }
}

Служба на стороне клиента:

public class MyClientSideService {
    private final MyServerSideService myServerSideService;

    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
    }
}

Служба на стороне сервера:

@Slf4j
public class MyServerSideService {
    //single threaded bottleneck service
    public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
        log.info("Req for {} started");
        try {
            Thread.sleep(100);
            return batchReq.stream().map(String::valueOf).collect(Collectors.toList());

        } catch (InterruptedException e) {
            return null;
        } finally {
            log.info("Req for {} finished");

        }
    }
}

И главное:

@Slf4j
public class MainClass {
    public static void main(String[] args) {
        MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int m = 0; m < 100; m++) {
                    int k = m;
                    log.info("Response is {}", myClient.requestRow(k));
                }
            }).start();
        }
    }
}

По логам это занимает примерно 4 мин 22 сек, но это слишком много. Я думаю, что это может быть значительно улучшено. Я хотел бы реализовать неявную пакетную обработку. Таким образом, MyClientSideService должен собирать запросы, и когда он становится 50 (это предварительно настроенный размер пакета) или истекает некоторый предварительно настроенный тайм-аут, затем запрашивать MyServerSideService и возвращать результат маршрута клиентам. Протокол должен быть синхронным, поэтому клиенты должны быть заблокированы до получения результата.

Я пытался писать код, используя CountDownLatches и CyclicBarriers, но мои попытки были далеки от успеха.

Как я могу достичь своей цели?

P.S.

Если заменить requestRowBatch, вернуть тип List<String> from на Map<Integer, String>, чтобы делегировать отображение запросов и ответов на сервер, следующие работы с ограничениями. Это работает, только если я отправляю ‹ = 25 запросов

@Slf4j
public class MyClientSideService {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
    private final Map<Integer, String> responseMap = new ConcurrentHashMap();
    private final AtomicBoolean started = new AtomicBoolean();

    private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
    private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);


    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        queue.offer(req);
        if (!started.compareAndExchange(false, true)) {
            log.info("Start batch collecting");
            startBatchCollecting();
        }
        startBatchRequestLatch.countDown();
        try {
            log.info("Awaiting batch response latch for {}...", req);
            awaitBatchResponseLatch.await();
            log.info("Finished awaiting batch response latch for {}...", req);
            return responseMap.get(req);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return "EXCEPTION";
        }
    }

    private void startBatchCollecting() {
        new Thread(() -> {
            try {
                log.info("Await startBatchRequestLatch");
                startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
                log.info("await of startBatchRequestLatch finished");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            responseMap.putAll(requestBatch(queue));
            log.info("Releasing batch response latch");
            awaitBatchResponseLatch.countDown();

        }).start();
    }

    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

Обновлять

Согласно ответу Солода, я смог разработать следующее:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
    private final AtomicInteger counter = new AtomicInteger(0);
    private final Lock lock = new ReentrantLock();

    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        lock.lock();
        try {
            queue.offer(Pair.of(req, future));
            int counter = this.counter.incrementAndGet();
            if (counter != 0 && counter % batchSize == 0) {
                log.info("request");
                List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
                Map<Integer, String> serverResponseMap = requestBatch(requests);
                queue.forEach(pair -> {
                    String response = serverResponseMap.get(pair.getKey());
                    CompletableFuture<String> value = pair.getValue();
                    value.complete(response);
                });
                queue.clear();
            }
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

Но это не работает, если размер не кратен размеру партии.


person gstackoverflow    schedule 01.11.2019    source источник
comment
Возможно, попробуйте что-то вроде CoalescingBulkloader, который позволяет кэшу неявно выполнять пакетные вызовы.   -  person Ben Manes    schedule 02.11.2019
comment
@BenManes Вы имеете в виду построить мое решение на основе идеи из источника, на который вы ссылаетесь, или вы имеете в виду, что я должен использовать библиотеку, на которую вы ссылаетесь напрямую, для создания решения?   -  person gstackoverflow    schedule 03.11.2019
comment
Любой был бы в порядке. Этот код следует той же идее, предложенной @Malt. Это предоставленный пример, а не часть библиотеки, поэтому напрямую не поддерживается. Есть также такие библиотеки, как Reactor, которые могут представлять интерес.   -  person Ben Manes    schedule 03.11.2019
comment
@Ben Manes Честно говоря, я не понял, как применить это для моего примера. не могли бы вы предоставить более подробную информацию, пожалуйста?   -  person gstackoverflow    schedule 05.11.2019
comment
@Ben Manes, пожалуйста, посмотрите обновление темы   -  person gstackoverflow    schedule 05.11.2019
comment
@Ben Manes, пожалуйста, посмотрите stackoverflow.com/a/58725718/2674303   -  person gstackoverflow    schedule 06.11.2019


Ответы (3)


Если заменить возвращаемый тип requestRowBatch с List<String> на Map<Integer, String>, чтобы делегировать сопоставление запроса и ответа серверу, я смог создать следующее решение:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer timeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final BlockingQueue<Pair<Integer, CompletableFuture>> queue = new LinkedBlockingQueue<>();

    private final Lock lock = new ReentrantLock();
    private final Condition requestAddedCondition = lock.newCondition();


    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
        startQueueDrainer();
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        while (!queue.offer(Pair.of(req, future))) {
            log.error("Can't add {} to the queue. Retrying...", req);
        }
        lock.lock();
        try {
            requestAddedCondition.signal();
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }

    private void startQueueDrainer() {
        new Thread(() -> {
            log.info("request");
            while (true) {
                ArrayList<Pair<Integer, CompletableFuture>> requests = new ArrayList<>();
                if (queue.drainTo(requests, batchSize) > 0) {
                    log.info("drained {} items", requests.size());
                    Map<Integer, String> serverResponseMap = requestBatch(requests.stream().map(Pair::getKey).collect(Collectors.toList()));
                    requests.forEach(pair -> {
                        String response = serverResponseMap.get(pair.getKey());
                        CompletableFuture<String> value = pair.getValue();
                        value.complete(response);
                    });
                } else {
                    lock.lock();
                    try {
                        while (queue.size() == 0) {
                            try {
                                log.info("Waiting on condition");
                                requestAddedCondition.await(timeoutMillis, TimeUnit.MILLISECONDS);
                                log.info("Waking up on condition");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    } finally {
                        lock.unlock();
                    }
                }

            }
        }).start();
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
        return myServerSideService.requestRowBatch(requestList);
    }
}

Похоже на рабочее решение. Но я не уверен, что это оптимально.

person gstackoverflow    schedule 06.11.2019

Ваше решение MyClientSideServiceCompletableFuture будет отправлять запросы на сервер каждый раз, когда вы добавляете что-то в очередь, и не ждет, пока запросы будут иметь пакетный размер. Вы используете BlockingQueue и добавляете ненужное условие блокировки и блокировки. BlockingQueue имеет возможности блокировки тайм-аута, поэтому никаких дополнительных условий не требуется.

Вы можете упростить свое решение следующим образом:

Он отправляет запросы на сервер только тогда, когда пакет заполнен или истек тайм-аут, а пакет не пуст.

private void startQueueDrainer() {

        new Thread(() -> {
            log.info("request");
            ArrayList<Pair<Integer, CompletableFuture>> batch = new ArrayList<>(batchSize);
            while (true) {
                try {
                    batch.clear(); //clear batch
                    long timeTowWait = timeoutMillis;
                    long startTime = System.currentTimeMillis();

                    while (timeTowWait > 0 && batch.size() < batchSize) {
                        Pair<Integer, CompletableFuture> request = queue.poll(timeTowWait , TimeUnit.MILLISECONDS);
                        if(request != null){
                          batch.add(request);
                        }
                        long timeSpent = (System.currentTimeMillis() - startTime);
                        timeTowWait = timeTowWait - timeSpent;
                    }

                    if (!batch.isEmpty()) {
                        // we wait at least timeoutMillis or batch is full
                        log.info("send {} requests to server", batch.size());
                        Map<Integer, String> serverResponseMap = requestBatch(batch.stream().map(Pair::getKey).collect(Collectors.toList()));
                        batch.forEach(pair -> {
                            String response = serverResponseMap.get(pair.getKey());
                            CompletableFuture<String> value = pair.getValue();
                            value.complete(response);
                        });
                    } else {
                        log.info("We wait {} but the batch is still empty", System.currentTimeMillis() - startTime);
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

Измените метод requestSingleRow, чтобы он не использовал блокировку.

  public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();

        while (!queue.offer(Pair.of(req, future))) {
            log.error("Can't add {} to the queue. Retrying...", req);
        }

        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }
person HPCS    schedule 07.11.2019

Вы можете использовать CompletableFuture.
Попросите потоки, вызывающие MyClientSideService, поместить свой запрос в Queue (возможно, BlockingQueue, и получить взамен новый CompletableFuture. Вызывающий поток может вызвать CompletableFuture.get() для блокировки до тех пор, пока не будет готов результат, или продолжить выполнение других действий. .

Этот CompletableFuture будет храниться вместе с запросом в MyClientSideService. Когда вы достигнете 50 запросов (и, следовательно, 50 CompletableFuture экземпляров), попросите клиентскую службу отправить пакетный запрос.

По завершении запроса используйте метод CompletableFuture.complete(value) каждого экземпляра ComplatableFuture в очереди, чтобы уведомить поток клиента о готовности ответа. Это разблокирует клиент, если он вызвал метод блокировки, такой как CompletableFuture.get(), или заставит его немедленно вернуться со значением, если он будет вызван позже.

person Malt    schedule 01.11.2019
comment
В этом случае MyClient должен ожидать Future[String] вместо String, если я правильно вас понял. Также я не вижу, как выполнить хвост очереди, если я отправлю задачу 99, тогда последние 49 вообще не будут выполняться - person gstackoverflow; 02.11.2019
comment
пожалуйста, посмотрите stackoverflow.com/a/58725718/2674303 - person gstackoverflow; 06.11.2019