Десериализация событий Flume на C# через Avro

Я настроил службу Flume, которая может отслеживать Netcat или отслеживать журнал с Exec в качестве источника и тому подобное. Я использую память в качестве канала и Avro в качестве приемника (Thrift указан в документах, но, похоже, не работает в Flume 1.3 или 1.4)

Я настроил сервер сокетов С# для получения сообщений и получаю поток байтов. Если я прочитаю их с помощью Encoding.UTF8.GetString(buffer), то увижу что-то вроде:

"\0\0\0\0\0\0\0\0\00�����Tt������5\ne\0�����Tt������5\ne\0\0appendBatch\0\0�\0�127.0.0.1 - - [12/Nov/2013:22:42:50 +0000] \"GET /docs/appdev/index.html HTTP/1.1\" 200 7645\0�127.0.0.1 - - [12/Nov/2013:22:44:07 +0000] \"GET /docs/appdev/introduction.html HTTP/1.1\" 200 8619\0�127.0.0.1 - - [12/Nov/2013:22:44:09 +0000] \"GET /docs/appdev/installation.html HTTP/1.1\" 200 9045\0�127.0.0.1 - - [12/Nov/2013:22:44:12 +0000] \"GET /docs/appdev/deployment.html HTTP/1.1\" 200 18800\0�127.0.0.1 - - [12/Nov/2013:22:49:07 +0000] \"GET /docs/appdev/source.html HTTP/1.1\" 200 24554\0�127.0.0.1 - - [12/Nov/2013:22:50:38 +0000] \"GET /docs/appdev/processes.html HTTP/1.1\" 200 30743\0�127.0.0.1 - - [12/Nov/2013:22:51:39 +0000] \"GET /docs/appdev/sample/ HTTP/1.1\" 200 1852\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /sample HTTP/1.1\" 404 963\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /favicon.ico HTTP/1.1\" 200 21630\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:23:02:13 +0000] \"GET /sample HTTP/1.1\" 404 963\0"

Итак, очевидно, я передаю данные, но я хотел бы правильно их десериализовать, а не выполнять какие-то извлечения регулярных выражений. Я вижу, что есть официальная библиотека Avro C# и есть библиотека Microsoft Hadoop с библиотеками десериализации. Я создал локальный объект для десериализации:

[DataContract]
public class AvroEvent
{
    [DataMember]
    public byte[] Body { get; set; }
}

и попробуйте десериализовать это:

  client = serverSocket.EndAccept(result);
  var myNetworkStream = new NetworkStream(client);
  myNetworkStream.Read(buffer, 0, size);
  var avro = new AvroSerializer(typeof(AvroEvent));
  var deser = avro.Deserialize(myNetworkStream);

то я получаю эту ошибку:

  System.InvalidOperationException was unhandled
  HResult=-2146233079
  Message=Unexpected number of bytes.
  Source=Microsoft.Hadoop.Avro

Я почти наверняка делаю все это неправильно, и я уверен, что люди будут советовать мне не использовать C#, но у меня почти закончились исходники в Google, поэтому, если кто-то еще на самом деле сделал это и направьте меня в правильном направлении, я был бы очень благодарен

Тоби


person TobyEvans    schedule 13.11.2013    source источник
comment
Можете ли вы уточнить, что делает вызов myNetworkStream.Read? В настоящее время похоже, что вы пытаетесь отбросить size байта перед десериализацией. Если это ваше намерение, я бы добавил комментарий на этот счет.   -  person chwarr    schedule 13.11.2013
comment
ох, не уверен, я ничего не понимаю в этом, мне нужен поток байтов, чтобы передать десериализатору Avro из сокета. Я попробую еще раз, но может случиться так, что после сортировки потока могут возникнуть проблемы с Avro/C#. Или нет ...   -  person TobyEvans    schedule 13.11.2013
comment
моя реальная потребность заключалась в том, чтобы передавать данные регистрации в реальном времени на клиент .net, где я был бы наиболее продуктивным в управлении тем, что с ними делать - оказывается, ответом является logstash, а не Flume ....   -  person TobyEvans    schedule 13.11.2013


Ответы (1)


Flume использует механизм RPC для передачи событий. Если выбран Avro, Flume полагается на Avro RPC, который не поддерживается библиотекой Microsoft Avro (как указано в < strong>Что нового), так как он предназначен только для использования в качестве среды сериализации.

С технической точки зрения метод Deserialize() ожидает, что поток будет иметь следующие данные (в битах):

11[size of byte array encoded in variable-length zig zag][actual byte] (*)

Ошибка, которую вы получаете, вероятно, связана с тем, что полученные данные имеют другой проводной формат.


* Начальный 1 необходим, поскольку версия 0.8.4951.5418 библиотека инкапсулирует каждый тип в объединение нулевого (0) и типа (1), поэтому первый 1 предназначен для записи AvroEvent, а второй 1 — для поля Body. Это поведение можно настроить в последней версии 1.1.0.5.

person Y.H.    schedule 28.02.2014