Не удается настроить новый поток для warc bolt

Я пытаюсь настроить новый поток, чтобы соединить болт Tika с болтом warc.

import com.digitalpebble.stormcrawler.tika.ParserBolt;
import com.digitalpebble.stormcrawler.warc.WARCHdfsBolt;

builder.setBolt("tika", new ParserBolt(), numWorkers)
  .localOrShuffleGrouping("shunt","tika");

WARCHdfsBolt warcbolt = getWarcBolt("XX");

builder.setBolt("warc", warcbolt, numWorkers)
  .localOrShuffleGrouping("tika",  "warc");

В определении Tika я изменил функцию outputDeclarerFields следующим образом, чтобы определить мой новый поток «warc»:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("url", "content", "metadata", "text"));
  declarer.declareStream(StatusStreamName, new Fields("url", "metadata", "status"));
  declarer.declareStream("warc",   new Fields("url", "content", "metadata", "text"));
}

Однако когда я запускаю топологию в локальном режиме, я получаю:

Любая помощь будет высоко ценится!!

Обратите внимание: если я использую поток StatusStreamName ("status") для соединения болтов tika и warc, он работает нормально.

Спасибо,

Этьен

WARC генерируются из необработанного, не проанализированного контента. Вы должны подключить WARC к выходу сборщика вместо болта синтаксического анализатора.


person EJO    schedule 15.06.2017    source источник


Ответы (1)


Вам не нужно объявлять новый поток только для варка, вы можете просто подключить warc bolt к потоку по умолчанию, выходящему из болта Tika.

Я вижу в твоем коде

import com.digitalpebble.stormcrawler.tika.ParserBolt;

что будет означать, что вы полагаетесь на реализацию по умолчанию (которая не генерирует поток warc). Не могли бы вы забыть заменить это своей модифицированной реализацией?

Уважаемый Жюльен! Большое спасибо за полезные комментарии. Я хочу архивировать в файлы WARC только выбранные веб-страницы. Выбор основан на регулярном выражении, которое находится в анализаторе. Затем, если у меня есть совпадение, заархивируйте содержимое страницы (используя поток warc). Отсюда связь между синтаксическим анализатором и боевыми болтами. Этот подход кажется вам правильным? Или я должен поступить иначе для реализации этого? Еще раз спасибо! Этьен

person Julien Nioche    schedule 16.06.2017
comment
Привет, Этьен. Отредактировали мой ответ выше. Забыл, когда я изначально ответил, что болты синтаксического анализатора также генерируют двоичное содержимое, поэтому вам вообще не нужно изменять болт tika. Вы можете написать специальный болт для проверки содержимого метаданных и передать только те кортежи, у которых есть K / V, который вы установили во время синтаксического анализа. Если вы имеете дело только с HTML-страницами, придерживайтесь болта JsoupParser. Кстати, первоначальная проблема была из-за неправильного импорта. Если да, отметьте мой ответ как правильный. Спасибо! - person EJO; 17.06.2017
comment
14308 [main] WARN oasdsSlot - SLOT debian8: 1027 Запуск в состоянии EMPTY - нулевое назначение 14308 [main] WARN oasdsSlot - SLOT debian8: 1028 Запуск в состоянии EMPTY - нулевое назначение 14308 [main] WARN oasdsSlot - SLOT debian8: 1029 Запуск в состоянии EMPTY - присвоение null 14309 [main] INFO oaslAsyncLocalizer - Очистка неиспользуемых топологий в / tmp / a1e3b7f5-e251-40ae-a032-b0839ca103c8 / supervisor / stormdist 14318 [main] INFO oasdsSupervisor - Запуск supervisor с id 7c36-40ab-9f85-4b7751ed2d6a на хосте debian8. 15030 [main] WARN o.a.s.d.nimbus - исключение при отправке топологии. (имя топологии = 'xxCrawler') #error {: cause nil: via [{: type org.apache.storm.generated.InvalidTopologyException: message nil
: at [org.apache.storm.daemon.common $ validate_structure_BANG_ invoke common.clj 185]}]: trace [[org.apache.storm.daemon.common $ validate_structure_BANG_ invoke common.clj 185]
[org.apache.storm.daemon.common $ system_topology_BANG_ invoke common.clj 378] < br> [org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782 submitTopologyWithOpts nimbus.clj 1694]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782 submitTopology 1726.

... Method.java 498] [clojure.lang.Reflector invokeMatchingMethod Refl ector.java 93] [clojure.lang.Reflector invokeInstanceMethod Reflector.java 28] [org.apache.storm.testing $ submit_local_topology вызывает testing.clj 310]
[org.apache.storm.LocalCluster $ _submitTopology вызывает LocalCluster.clj 49] [org.apache.storm.LocalCluster submitTopology nil -1]
[com.digitalpebble.stormcrawler.ConfigurableTopology submit ConfigurableTopology.java 76]
[com.digitalpebble.stormcrawler.ConfigurableTopology submit ConfigurableTopology 65] [java xx.xx.xx.xx.xxTopology run xxTopology.java 111]
[com.digitalpebble.stormcrawler.ConfigurableTopology start ConfigurableTopology.java 50] [xx.xx.xx.xx.xxTopology main xxTopology.java 53]]} 15035 [главная] ОШИБКА oassoazsNIOServerCnxnFactory - Поток потока [main, 5, main] умер org.apache.storm.generated.InvalidTopologyException: null в org.apache.storm.daemon.common $ validate_structure_BANG_.invoke (common.clj: 185) ~ [ядро-шторм-1. 1.0.jar: 1.1.0] в org.apache.storm.daemon.common $ system_topology_BANG_.invoke (common.clj: 378) ~ [storm-core-1.1.0.jar: 1.1.0] в org.apache. storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782.submitTopologyWithOpts (nimbus.clj: 1694) ~ [storm-core-1.1.0.jar: 1.1.0] в org.apache.storm.daemon.nimbus $ mk_reified_nimbus.submit__107 (nimbus.clj: 1726) ~ [storm-core-1.1.0.jar: 1.1.0] в sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) ~ [?: 1.8.0_131] в sun.reflect.NativeMethodAccessorImpl. invoke (NativeMethodAccessorImpl.java:62) ~ [?: 1.8.0_131] в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) ~ [?: 1.8.0_131] в java.lang.reflectinvoke ( Method.java:498) ~ [?: 1.8.0_131] в clojure.lang.Reflector.invokeMatchingMethod (Reflector.java:93) ~ [clojure-1.7.0.jar :?] в clojure.lang.Reflector.invokeInstanceMethod ( Reflector.java:28) ~ [clojure- 1.7.0.jar :?] в org.apache.storm.testing $ submit_local_topology.invoke (testing.clj: 310) ~ [storm-core-1.1.0.jar: 1.1.0] в org.apache.storm. LocalCluster $ _submitTopology.invoke (LocalCluster.clj: 49) ~ [storm-core-1.1.0.jar: 1.1.0] в org.apache.storm.LocalCluster.submitTopology (Неизвестный источник) ~ [storm-core-1.1. 0.jar: 1.1.0] на com.digitalpebble.stormcrawler.ConfigurableTopology.submit (ConfigurableTopology.java:76) ~ [xx-crawler-1.1.jar :?] на com.digitalpebble.stormcrawler.ConfigurableTopology.submit (ConfigurableTopology.submit (ConfigurableTopology. java: 65) ~ [xx-1.1.jar :?] в xx.xx.xx.xx.xxTopology.run (xxTopology.java:111) ~ [xx-crawler-1.1.jar :?] в com.digitalpebble. stormcrawler.ConfigurableTopology.start (ConfigurableTopology.java:50) ~ [xx-crawler-1.1.jar :?] в xx.xx.xx.xx.xxTopology.main (xxTopology.java:53) ~ [xx-crawler-1.1 .банка:?] - person Julien Nioche; 18.06.2017