Ввод Logstash jdbc дублирует результаты

Я использую плагин logstash input jdbc для чтения двух (или более) баз данных и отправки данных в elasticsearch, а также использую kibana 4 для визуализации этих данных.

Это моя конфигурация logstash:

input {
  jdbc {
    type => "A"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }

jdbc {
    type => "B"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }
}
filter {

}
output {

    if [type] == "A" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-servera-%{+YYYY.MM.dd}"
        }    
    }
    if [type] == "B" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-serverb-%{+YYYY.MM.dd}"
        }    
    }

  stdout { codec => rubydebug }
}

Проблема в том, что каждый раз при запуске logstash он начинает сохранять все данные, которые уже есть в эластичном поиске.

После запуска с предложением where = date > '2015-09-10' я остановил logstash и снова запустил (с --debug) со "специальным параметром": sql_last_date. После запуска logstash он начинает показывать это в журнале:

←[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2)
AS 'DDD',\nCASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  \n       W
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t   ELSE 'ERRO'\n   END AS 'TIPO_MENSAGEM
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09-
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line=
>"107", :method=>"execute_statement"}←[0m

На этот раз я побежал с «настоящим» утверждением:

SELECT 
    SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',
CASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  
       WHEN R.STATUS = 'RCON' THEN 'SUCESSO'
       ELSE 'ERRO'
   END AS 'TIPO_MENSAGEM',
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA

FROM RECARGA R (NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO 
where R.DT_RECARGA > :sql_last_start
ORDER BY R.DT_RECARGA ASC

Кто-нибудь знает, как это решить?

Спасибо!


person Rafael Lima    schedule 10.09.2015    source источник


Ответы (2)


sql_last_start теперь sql_last_valueпожалуйста, проверьте здесь специальный параметр sql_last_start теперь переименован в sql_last_value для большей ясности, поскольку он не только ограничен датой и временем, но также может иметь другой тип столбца. так что теперь решение может быть примерно таким

input {
jdbc {
     type => "A"
     jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-  jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
     jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
     jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
     jdbc_user => "user"
     jdbc_password => "pass"
     schedule => "5 * * * *"
     use_column_value => true
     tracking_column => date
     statement => "SELECT id, date, content, status from test_table WHERE date >:sql_last_value"
    #clean_run true means it will reset sql_last_value to zero or initial value if datatype is date(default is also false)
     clean_run =>false
   }
jdbc{
  #for type B....
  }
}

я тестировал с sql server db

пожалуйста, запустите в первый раз с clean_run=›ture, чтобы избежать ошибки типа данных, пока в процессе разработки мы можем иметь другое значение типа данных, хранящееся в переменной sql_last_value

person Raghavendra    schedule 03.05.2016
comment
Пожалуйста, предоставьте необходимую информацию для ответа на вопрос на сайте. - person sebenalern; 03.05.2016
comment
ранее я был просто комментарием, так как у меня не было достаточного рейтинга, чтобы прокомментировать ответ выше. теперь я написал полный ответ, так что это может помочь кому-то еще .. :) - person Raghavendra; 09.05.2016

По умолчанию ввод jdbc будет выполнять настроенный оператор SQL. В вашем случае ваше утверждение выбирает все в test_table. Вам нужно указать оператору SQL загружать данные только из последнего запуска ввода jdbc с использованием предопределенного sql_last_start в запросе SQL.

input {
  jdbc {
    type => "A"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start"
  }

jdbc {
    type => "B"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start"
  }
}

Кроме того, если по какому-либо совпадению одна и та же запись дважды загружается из вашей БД, и вы не хотите, чтобы дубликаты создавались на вашем сервере ES, вы также можете указать использовать идентификатор записи в качестве идентификатора документа в вашем выводе elasticsearch, таким образом, документ будет обновляться в ES и не дублироваться.

output {

    if [type] == "A" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-servera-%{+YYYY.MM.dd}"
            document_id => "%{id}"       <--- same id as in DB
        }    
    }
    if [type] == "B" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-serverb-%{+YYYY.MM.dd}"
            document_id => "%{id}"       <--- same id as in DB
        }    
    }

  stdout { codec => rubydebug }
}
person Val    schedule 10.09.2015
comment
Я пытался использовать это. В первый раз я использовал дату WHERE › «2015-09-09». Logstash получил все данные со вчерашнего дня до сегодняшнего 11:30. Когда я запустил второй раз (11:50), Logstash не получил никаких данных. И да, иметь больше данных в базе данных. - person Rafael Lima; 10.09.2015
comment
Единственное, что появляется в журнале: Запуск Logstash завершен, завершение работы Logstash завершено. - person Rafael Lima; 10.09.2015
comment
Можете ли вы запустить logstash с помощью --debug и предоставить дополнительные выходные данные? - person Val; 10.09.2015
comment
Я добавлю этот вывод на вопрос. - person Rafael Lima; 10.09.2015
comment
В вашем выводе я вижу, что sql_last_start было 2015-09-10 18:48:00 UTC. Не уверен, что это проблема, но на самом деле это не около 11:50. - person Val; 10.09.2015
comment
Я не уверен, почему это происходит. Данные, которые находятся в эластичном поиске, кажутся правильными. - person Rafael Lima; 11.09.2015
comment
Данные в ES, безусловно, правильные, но это больше проблема того, что logstash извлек или нет из вашей БД, поскольку вы сказали, что он не получил никаких данных в 11:50, что нормально, поскольку он пытался обновить данные после 18:48 (т.е. в будущем по сравнению с моментом запуска logstash в 11:50) - person Val; 12.09.2015