JSR 352 с профилем Liberty — как реализовать контрольную точку, когда ItemReader выполняет запрос к БД

У меня есть 10 записей в моей исходной таблице, и у меня есть количество элементов как 3.

У меня есть 2 раздела для обработки этих 10 записей (т.е. первые 5 записей будут обрабатываться в первом разделе, а остальные записи будут обрабатываться во втором разделе при обработке записей во втором разделе. Я выбрасываю исключение, поэтому задание не будет выполнено во втором фрагменте второго раздела. когда я перезапускаю задание, сбойный раздел снова обрабатывает все записи (то есть первый фрагмент и 2-й фрагмент).Перезапуск задания должен обрабатывать только последние сбойные записи фрагмента, но не все записи в этом разделе.Можете ли вы помочь мне как этого добиться?

Мой JSL выглядит следующим образом:

    <?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
    xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
    id="readingfrom-db" restartable="true" version="1.0" >
    <properties >
        <property name="numRec" value="#{jobParameters['numRec']}?:5;"/>        
        <property name="chunkSize" value="#{jobParameters['chunkSize']}?:3;"/>
        <property name="whereclauseFrom" value="#{jobParameters['whereclauseFrom']}?:5;"/>
        <property name="whereclauseTo" value="#{jobParameters['whereclauseTo']}?:6;"/>      
        <property name="dsJNDI" value="#{jobParameters['dsJNDI']}?:jdbc/db2;"/>
        <property name="dsJNDI1" value="#{jobParameters['dsJNDI1']}?:jdbc/db2;"/>
        <property name="tableName" value="#{jobParameters['tableName']}?:CISDW.AIF1_CH;"/>
        <property name="ProcesstableName" value="#{jobParameters['ProcesstableName']}?:CISDW.PROC_AIF1_CH;"/>
    </properties>
    <step id="runcache" next="readFromDB">
        <batchlet ref="com.cdc.runcache.CacheRunnerBatchlet" />
    </step>
    <step id="readFromDB">
        <listeners>
            <listener ref="com.cdc.dbreader.LogExceptionListener"/>
        </listeners> 
        <chunk item-count="3" checkpoint-policy="item">
            <reader ref="com.cdc.dbreader.DBItemReader">
                <properties >
                    <property name="dsJNDI" value="#{jobProperties['dsJNDI']}"/>
                    <property name="tableName" value="#{jobProperties['tableName']}"/>
                    <property name="whereclauseFrom" value="#{partitionPlan['modrec']}"/>                   
                </properties>
            </reader>
            <processor ref="com.cdc.dbreader.DBItemProcessor" />            
            <writer ref="com.cdc.dbreader.DBItemWriter">
                <properties >
                    <property name="dsJNDI" value="#{jobProperties['dsJNDI1']}"/>
                    <property name="tableName" value="#{jobProperties['ProcesstableName']}"/>
                </properties>
            </writer>
        </chunk>
        <partition>
            <plan partitions="2" threads="2">
                <properties partition="0">
                    <property name="modrec" value="#{jobProperties['whereclauseFrom']}"/>                   
                </properties>
                <properties partition="1">
                    <property name="modrec" value="#{jobProperties['whereclauseTo']}"/>
                </properties>
            </plan>
        </partition>        
    </step>                     
</job>

Мой читатель предметов выглядит следующим образом:

 public class DBItemReader implements ItemReader {  
    @Inject
    @BatchProperty
    private String dsJNDI;

    @Inject
    @BatchProperty
    private String whereclauseFrom;


    @Inject
    @BatchProperty
    private String tableName;

    private Connection conn =null;
    private int totalRecords=0;

    private DataSource ds = null;
    List<RecObj> listRecObj=new ArrayList<RecObj>();    

    @Override
    public Object readItem() throws SQLException {
         if (listRecObj.size() == 0) {             
             return null;
         } else { 
             RecObj rec =null;           
             Iterator<RecObj> iter =listRecObj.iterator();
             while (iter.hasNext()) {               
                rec = iter.next();               

               if (Integer.parseInt(rec.getRec())  == 7) {                      
                  throw new IllegalStateException("Thrown Error");
                }
                iter.remove();
                return rec;
             }
             return rec;
         }


     @Override
    public void open(Serializable arg0) throws NamingException, SQLException {
          ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI)); 
//        System.out.println("whereclauseFrom: " + whereclauseFrom);          
          conn = ds.getConnection(); 
          String sql ="";
          if(Integer.parseInt(whereclauseFrom) == 5){
              sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) <= "+ whereclauseFrom;
          }else if(Integer.parseInt(whereclauseFrom) == 6){
              sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) >= "+ whereclauseFrom;
          }

          PreparedStatement ps = conn.prepareStatement(sql);
          ResultSet rs=ps.executeQuery();
          while(rs.next()){
             totalRecords++;
             String rec=rs.getString("REC"); 
             if(rec != null)
                listRecObj.add(new RecObj(rec));

          }          
          rs.close();          
    }   
    @Override
    public void close() throws SQLException {
        conn.close();       
    }   
    @Override
    public Serializable checkpointInfo() {       
            return null;
    }

}
    }

МОЙ класс Writer выглядит следующим образом:

public class DBItemWriter extends AbstractItemWriter implements ItemWriter {    
    @Inject
    @BatchProperty
    private String dsJNDI;

    @Inject
    @BatchProperty
    private String tableName;

    private DataSource ds = null; 

    @Override
    public void open(Serializable arg0) throws NamingException {
         ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));            
    }

    @Override
    public void writeItems(List<java.lang.Object> items) throws BatchUpdateException,SQLException{      
        Connection conn = ds.getConnection();           
        String sql = "INSERT INTO "+tableName+ "(MOD_REC) VALUES(?) ";       
        PreparedStatement ps = conn.prepareStatement(sql);        
        for (Object obj : items) {          
            RecObj v = (RecObj)obj;
            System.out.println("=======Writer values===="+v.getRec());                
            ps.setString(1, v.getRec());            
            ps.addBatch();
        }        
        ps.executeBatch();
        ps.clearBatch();
        ps.close();
        conn.close();
    }
}

Ниже мой процессор:

public class DBItemProcessor implements ItemProcessor {
    Integer count=0;   
    @Override
    public Object processItem(Object arg0) {
        count++;
        RecObj v=(RecObj)arg0;
        String vname=v.getRec();
        System.out.println("=========Processer Values==="+vname);
        return new RecObj(vname+count);
    }
}

Ниже мой класс Bean

public class RecObj {
   private String rec;


  public RecObj(String rec) {
    this.rec=rec;
}

person Srinivas K    schedule 23.05.2016    source источник
comment
Похоже, у вас есть правильное базовое понимание, так почему бы вам не поделиться некоторыми фрагментами кода вместе с JSL (XML), используемым для определения задания. В частности, вероятно, было бы полезно увидеть, как вы параметризуете свой ридер (вводя значения @BatchProperty) для каждого раздела, а также код, который вы используете для реализации позиционирования на основе значения контрольной точки.   -  person Scott Kurz    schedule 23.05.2016
comment
Я добавил все свои артефакты в свой вопрос. Не могли бы вы проверить мой код и не могли бы вы помочь, как решить мою проблему?   -  person Srinivas K    schedule 23.05.2016
comment
Я пока не вижу ничего плохого. Если вы добавите метод читателя open(), который использует значение контрольной точки для построения запроса к БД, а также любой код, который у вас есть, бросающий exc на 2-й фрагмент, я посмотрю еще раз.   -  person Scott Kurz    schedule 23.05.2016
comment
Я добавил метод открытия класса Reader. Я читаю первые 5 записей в 1-м разделе на основе пакетного свойства whereclauseFrom.same во 2-м разделе. Я читаю оставшиеся 5 записей во 2-м разделе.   -  person Srinivas K    schedule 23.05.2016


Ответы (1)


Вам необходимо вернуть значение контрольной точки в checkpointInfo() вашего ридера, которое будет передано в метод вашего ридера open() при перезапуске. Вот как считыватель и пакетный контейнер координируются, чтобы обеспечить контрольную точку при перезапуске.

Таким образом, у вас может быть что-то вроде (ищите комментарии CHECKPOINT):

public class DBItemReader implements ItemReader {  

    // ... 

    // CHECKPOINT field defined
    private String checkpoint = null; 

    @Override
    public void open(Serializable checkpoint) throws NamingException, SQLException {

        // CHECKPOINT-based positioning through query value.
        // Initial position = whereclauseFrom, on restart set to checkpoint
        String queryVal = (String)(checkpoint == null ? whereclauseFrom : checkpoint);       

        if(Integer.parseInt(whereclauseFrom) == 5){
            sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) <= "+ queryVal;
        }else if(Integer.parseInt(whereclauseFrom) == 6){
            sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) >= "+ queryVal;
        }
        // ..
    }

    @Override
    public Object readItem() throws SQLException {
        if (listRecObj.size() == 0) {             
            return null;
        } else { 
            RecObj rec =null;           
            Iterator<RecObj> iter =listRecObj.iterator();
            while (iter.hasNext()) {               
                rec = iter.next();               
                // CHECKPOINT updated
                checkpoint = rec.getRec();
                if (Integer.parseInt(rec.getRec())  == 7) {                      
                    throw new IllegalStateException("Thrown Error");
                }
            }
        }
        // ...
    }

    @Override
    public Serializable checkpointInfo() {      
        // CHECKPOINT returned at end of chunk
        return checkpoint;
    }
}
person Scott Kurz    schedule 23.05.2016
comment
Обратите внимание, что в вашем примере есть еще одна проблема. Даже если вы внесли предлагаемые обновления, если вы потерпите неудачу с записью № 7, вы все равно начнете с самого начала после перезапуска 2-го раздела. Это связано с тем, что при количестве элементов 3, начиная с 6, первый фрагмент будет состоять из записей 6,7,8. Таким образом, вы на самом деле терпите неудачу в первом куске, а не во втором. - person Scott Kurz; 23.05.2016
comment
Спасибо, Скотт, все работает. Спасибо за быструю помощь. - person Srinivas K; 23.05.2016
comment
if(Integer.parseInt(queryVal) == 5 && arg0 == null){ sql = SELECT * FROM + tableName + WHERE CAST(REC AS INTEGER) ‹= + whereclauseFrom; } else if(Integer.parseInt(whereclauseFrom) == 6 && arg0 == null){ sql = SELECT * FROM + tableName + WHERE CAST(REC AS INTEGER) ›= + whereclauseFrom; } else { sql = SELECT * FROM + tableName + WHERE CAST(REC AS INTEGER) › + queryVal; } - person Srinivas K; 23.05.2016
comment
Отлично, рад помочь! Поскольку похоже, что вы новичок на сайте, позвольте мне отметить, что вы можете принять ответ, чтобы показать другим, что он ответил на ваш вопрос. Спасибо. - person Scott Kurz; 23.05.2016
comment
Скотт, у нас есть пример контрольной точки для пользовательского модуля записи, который реализует itemWriter. - person jcrshankar; 16.10.2018
comment
@jcrshankar, поскольку исходный вопрос задавался об ItemReader, почему бы вам не задать вопрос о писателе в новом отдельном вопросе с похожими тегами? Я поищу это. - person Scott Kurz; 16.10.2018