Как отправлять электронные письма из пакетного задания Java EE

У меня есть требование ежедневно обрабатывать список большого количества пользователей, чтобы отправлять им уведомления по электронной почте и SMS на основе некоторого сценария. Для этого я использую модель пакетной обработки Java EE. Моя работа xml выглядит следующим образом:

<step id="sendNotification">
    <chunk item-count="10" retry-limit="3">
        <reader ref="myItemReader"></reader>
        <processor ref="myItemProcessor"></processor>
        <writer ref="myItemWriter"></writer>
        <retryable-exception-classes>
            <include class="java.lang.IllegalArgumentException"/>
        </retryable-exception-classes>
    </chunk>
</step>

Метод MyItemReader onOpen считывает всех пользователей из базы данных, а readItem() считывает по одному пользователю за раз, используя итератор списка. В myItemProcessor фактическое уведомление по электронной почте отправляется пользователю, а затем пользователи сохраняются в базе данных в классе myItemWriter для этого фрагмента.

@Named
public class MyItemReader extends AbstractItemReader {

    private Iterator<User> iterator = null;
    private User lastUser;

    @Inject
    private MyService service;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        super.open(checkpoint);

        List<User> users = service.getUsers();
        iterator = users.iterator();

        if(checkpoint != null) {
            User checkpointUser = (User) checkpoint;
            System.out.println("Checkpoint Found: " + checkpointUser.getUserId());
            while(iterator.hasNext() && !iterator.next().getUserId().equals(checkpointUser.getUserId())) {
                System.out.println("skipping already read users ... ");
            }
        }
    }

    @Override
    public Object readItem() throws Exception {

        User user=null;

        if(iterator.hasNext()) {
            user = iterator.next();
            lastUser = user;
        }
        return user;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return lastUser;
    }
}

Моя проблема в том, что контрольная точка хранит последнюю запись, которая была выполнена в предыдущем фрагменте. Если у меня есть чанк со следующими 10 пользователями, и в myItemProcessor 5-го пользователя возникает исключение, то при повторной попытке будет выполнен весь чанк, и все 10 пользователей будут обработаны снова. Я не хочу, чтобы уведомление снова отправлялось уже обработанным пользователям.

Есть ли способ справиться с этим? Как это должно быть сделано эффективно?

Любая помощь будет высоко оценен. Спасибо.


person Faiza Aslam    schedule 29.11.2017    source источник
comment
Я бы подумал о переносе отправки электронной почты на ItemWriter.writeItems, даже если в итоге вы просматриваете элементы и отправляете их по одному. Но сначала не могли бы вы объяснить логику повторных попыток обработки IllegalArgumentException? Это вызвано чтением данных, какой-либо другой обработкой перед отправкой или отправкой электронной почты? И почему поможет повторная попытка (почему в следующий раз исключение не произойдет)?   -  person Scott Kurz    schedule 30.11.2017
comment
Эй, Скотт, IllegalArgumentException предназначен только для целей тестирования. Фактическим исключением будет исключение Socket или Timeout, с которым мы иногда сталкиваемся в нашей службе sms. Это можно кинуть в процессорную часть.   -  person Faiza Aslam    schedule 01.12.2017


Ответы (2)


Я собираюсь опираться на комментарии @cheng. Я отдаю ему должное здесь, и, надеюсь, мой ответ принесет дополнительную пользу в организации и полезном представлении вариантов.

Ответ: поставьте в очередь сообщения для другой MDB, чтобы они отправлялись для отправки электронных писем.

Фон:

Как указал @cheng, сбой означает, что вся транзакция откатывается, а контрольная точка не продвигается.

Итак, как быть с тем фактом, что ваш блок отправил электронные письма некоторым пользователям, но не всем? (Можно сказать, что он откатился, но с «побочными эффектами».)

Таким образом, мы могли бы переформулировать ваш вопрос следующим образом: Как отправить электронное письмо из шага пакетной обработки?

Ну, если у вас есть способ отправлять электронные письма через транзакционный API (реализующий XAResource и т. д.), вы можете использовать этот API.

Предполагая, что вы этого не сделаете, я бы выполнил транзакционную запись в очередь JMS, а затем отправил электронные письма с отдельной MDB (как предложил @cheng в одном из своих комментариев).

Предлагаемая альтернатива: используйте ItemWriter для отправки сообщений в очередь JMS, а затем используйте отдельную MDB для фактической отправки электронных писем.

При таком подходе вы по-прежнему получаете эффективность за счет пакетной обработки и обновлений вашей БД (вы все равно отправляли электронные письма только по одному), и вы можете извлечь выгоду из простой проверки и перезапуска без необходимости писать сложную обработку ошибок.

Это также может быть повторно использовано в качестве шаблона для пакетных заданий и даже вне пакета.

Другие альтернативы

Некоторые другие идеи, которые я не считаю такими хорошими, перечислены для обсуждения:

Добавление логики пакетного приложения для отслеживания пользователей по электронной почте (с помощью ItemProcessListener)

Вы можете создать свой собственный список успешных/неудачных писем, используя методы ItemProcessListener: afterProcess и onProcessError.

Таким образом, при перезапуске вы можете узнать, какие пользователи были отправлены по электронной почте в текущем блоке, на который мы переместились после отката всего блока, даже если некоторые электронные письма уже были отправлены.

Это, безусловно, усложняет вашу пакетную логику, и вы также должны каким-то образом сохранять этот список успехов или неудач. Кроме того, этот подход, вероятно, очень специфичен для этой работы (в отличие от очереди для обработки MDB).

Но это проще, поскольку у вас есть одно пакетное задание без необходимости в поставщике сообщений и отдельном компоненте приложения.

Если вы пойдете по этому пути, вы можете использовать комбинацию пропускаемого и повторного исключения без отката.

кусок одного элемента

Если вы определяете свой фрагмент с помощью item-count="1", вы избегаете сложного кода с контрольными точками и обработкой ошибок. Однако вы жертвуете эффективностью, поэтому это имело бы смысл только в том случае, если бы другие аспекты пакетной обработки были очень привлекательными: например. планирование и управление заданиями через общий интерфейс, возможность перезапуска на неудачном этапе задания

Если вы пойдете по этому пути, вы можете захотеть определить исключения сокетов и тайм-аутов как исключения без отката (используя ), поскольку от отката ничего не выиграешь, и вы можете захотеть повторите попытку из-за тайм-аута сети.

Поскольку вы конкретно упомянули об эффективности, я предполагаю, что это вам не подходит.

использовать синхронизацию транзакций

Возможно, это сработает, но пакетный API не делает это особенно простым, и у вас все еще может быть случай, когда блок завершается, но одно или несколько писем не отправляются.

person Scott Kurz    schedule 04.12.2017
comment
Как я уже упоминал в ответе, @cheng изложил ключевую идею и диагностировал проблему, так что спасибо ему, и, надеюсь, эта запись будет дополнительно полезна. - person Scott Kurz; 05.12.2017
comment
Привет, Скотт, спасибо за подробный ответ с таким количеством альтернатив. Я бы предпочел использовать подход MDB. Спасибо еще раз :) - person Faiza Aslam; 06.12.2017

Ваш текущий обработчик элементов делает что-то за пределами области транзакции фрагмента, что привело к рассинхронизации состояния приложения. Если вам требуется отправлять электронные письма только после того, как все элементы в блоке будут успешно завершены, вы можете переместить часть электронной почты в ItemWriterListener.afterWrite(items).

person cheng    schedule 30.11.2017
comment
Эй, Ченг, спасибо за ответ. Скажите, пожалуйста, как убедиться, что мой обработчик элементов находится в области транзакции фрагмента. Кроме того, в моем ItemProcessor я отправляю электронные письма только пользователям. Если я перемещу его в WriterListener, как я могу убедиться, что электронное письмо было отправлено пользователю? - person Faiza Aslam; 30.11.2017
comment
Отправка электронных писем предполагает использование нетранзакционных ресурсов, поэтому я не думаю, что вы можете включить это в транзакцию фрагмента, и, следовательно, невозможно отозвать уже отправленные электронные письма, если некоторые элементы в одном и том же фрагменте не удались. Я чувствую, что часть отправки по электронной почте относительно отделена от остальной логики обработки фрагментов, поэтому я предложил перенести ее в прослушиватель записи. - person cheng; 30.11.2017
comment
Как обрабатывать любые возможные сбои во время отправки электронной почты, зависит от ваших бизнес-требований. Это кажется слишком неуклюжим, чтобы провалить чанк из-за ошибок при отправке писем. Поскольку вы пытаетесь выполнить 2 разные задачи за один шаг, становится очень сложно найти хорошую стратегию отката/повторения, которая удовлетворит обе задачи. Может быть, вы можете разделить часть отправки по электронной почте на последующий шаг, который считывается из выходной таблицы первого шага? - person cheng; 30.11.2017
comment
Чтобы включить отправку электронной почты в транзакцию блока, платформа/библиотека, используемая для отправки электронной почты, должна быть транзакционной и поддерживать XA. Например, вы можете добавить очередь JMS в свое приложение и заставить обработчик элементов отправлять сообщение для каждого обрабатываемого элемента данных в очередь в том же контексте транзакции фрагмента. Затем прослушиватель сообщений очереди (например, MessageDriven EJB) может отправлять электронные письма. Однако это добавляет дополнительную сложность вашему приложению. - person cheng; 01.12.2017
comment
Эй, Ченг, меня беспокоит только то, почему модель пакетной обработки не вызывает метод checkpointInfo в случае какого-либо исключения. Он вызывается только при успешном выполнении чанка. Таким образом, он сохраняет только мой последний элемент последнего фрагмента, а не последний элемент текущего фрагмента... - person Faiza Aslam; 01.12.2017
comment
Все элементы в чанке считаются единой единицей работы и записываются вместе в одной транзакции в модуле записи элементов. Если во время записи возникает какой-либо сбой, он не может быть вызван каким-либо конкретным элементом данных. Или даже так, почти невозможно отследить этот конкретный предмет. - person cheng; 03.12.2017