Использование уведомлений Kaa с функцией сбора данных Kaa

Я хочу использовать уведомление Kaa в сочетании с функцией сбора данных Kaa.

Как я могу реализовать свой сценарий для получения журналов?!

Сценарий:

1- Сервер отправляет уведомление конечной точке (используя идентификатор конечной точки), затем конечная точка отвечает отправкой данных с помощью функции сбора данных.

2- Сервер немного подождет и проверит метку времени последней записи приложения (я пытаюсь использовать приложение журнала MongoDB) для конечной точки (по идентификатору конечной точки).

3- "Добавить прослушиватель уведомлений, который прослушивает все уведомления:"

  kaaClient.addNotificationListener(new NotificationListener() {
        @Override
        public void onNotification(long id, SecurityAlert sampleNotification) {
            LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
            LOG.info("Notification body: {} \n", sampleNotification.getAlertMessage());
            LOG.info("Notification alert type: {} \n", sampleNotification.getAlertType());

            inputTopicIdMessage();
        }
    });

Требования к ведению журнала:

1- Добавлена ​​схема конфигурации:

{
"type": "record",
"name": "Configuration",
"namespace": "org.kaaproject.kaa.schema.sample",
"fields": [
    {
        "name": "samplingPeriod",
        "type": "int",
        "by_default": "1"
    }
]}

2- Добавлена ​​схема журнала:

{
"type":"record",
"name":"Data",
"namespace":"org.kaaproject.kaa.scheme.sample",
"fields":[
    {
        "name":"topicId",
        "type":"int"
    },
    {
        "name":"timeStamp",
        "type":"long"
    }
],
"displayName":"Logging scheme"

}

3- Добавлено приложение журнала с MongoDB.

После этого я хочу включить просмотр журналов с помощью следующих команд:

db.logs_ApplicationToken.find();

Обновлено (03.12.2017):

Я запустил следующий код, аналогичный коду сбора данных Kaa для кода уведомления Kaa:

    private static class MeasureSender implements Runnable {
    KaaClient kaaClient;

    MeasureSender(KaaClient kaaClient) {
        this.kaaClient = kaaClient;
    }

    @Override
    public void run() {
        sentRecordsCount.incrementAndGet();
        DataLogging record = generateTopicId();
        RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
        LOG.info("Log record {} submitted for sending", record.toString());
        try {
            RecordInfo recordInfo = future.get(); // wait for log record delivery error
            BucketInfo bucketInfo = recordInfo.getBucketInfo();
            LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
                    bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
            confirmationsCount.incrementAndGet();
        } catch (Exception e) {
            LOG.error("Exception was caught while waiting for log's delivery report.", e);
        }
    }
}

private static DataLogging generateTopicId() {
    //TODO: Logic for get topicId
    return new DataLogging(topicId, System.currentTimeMillis());
}

После запуска приложения, когда я запускаю команду ниже на сервере Kaa:

db.logs_18693008741969774929.find();

Я получаю результат:

{ "_id" : ObjectId("5a228679ef540e07f3e73cd6"), "header" : { "endpointKeyHash" :{ "string" : "dXhbOD271Qtg9+FhxHXfrjE9bw4=" }, "applicationToken" : { "string" : "18693008741969774929" }, "headerVersion" : { "int" : 1 }, "timestamp" : { "long": NumberLong("1512212089541") }, "logSchemaVersion" : { "int" : 5 } }, "event": { "topicId" : 0, "timeStamp": 0 } }

В результате показывает, что "topicId" не получен. Потому что он равен 0.

Как вы можете видеть в последнем вышеприведенном методе private static DataLogging generateTopicId(){},
мне нужна некоторая логика для его выполнения.


Обновлено (06.12.2017):

public class NotificationDemo {

private static final Logger LOG = LoggerFactory.getLogger(NotificationDemo.class);
private static KaaClient kaaClient;
private static final int LOGS_DEFAULT_THRESHOLD = 1;
private static int samplePeriodInSeconds = 1;
private static volatile AtomicInteger sentRecordsCount = new AtomicInteger(0);
private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
private JsonObjectParserImpl jsonObjectParser = new JsonObjectParserImpl();
private String StatusOfDevices = String.valueOf(jsonObjectParser.getGetStatusOfDevices());

private static ScheduledExecutorService executor;
private static ScheduledFuture<?> executorHandle;
/**
 * The list of all available notification
 * <p>
 * private static int samplePeriodInSeconds = 1;
 * private static volatile AtomicInteger sentRecordsCount = new AtomicInteger0t;
 * private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
 * <p>
 * private static Random rand = new Random(o;pics.
 */
private static List<Topic> topics;
/**
 * Topics client subscribed
 */
private static List<Topic> subscribedTopics = new ArrayList<Topic>();

InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader keyboardInput = new BufferedReader(inputStreamReader);

public NotificationDemo() throws IOException, JSONException {
}

public static void main(String[] args) throws IOException, JSONException {

    NotificationDemo main = new NotificationDemo();
    main.config();

}

public void config() {

    LOG.info("Notification demo started");

    //kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener(), true);
    KaaClient kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener() {
        @Override
        public void onStarted() {
            LOG.info("--= Kaa client started =--");
        }

        @Override
        public void onStopped() {
            LOG.info("--= Kaa client stopped =--");
        }
    }, true);

    /*
    * Set record count strategy for uploading every log record as soon as it is created.
     */
    kaaClient.setLogUploadStrategy(new RecordCountLogUploadStrategy(LOGS_DEFAULT_THRESHOLD));
/*
 * A listener that listens to the notification topic list updates.
 */
    kaaClient.addConfigurationListener(new ConfigurationListener() {
        @Override
        public void onConfigurationUpdate(Configuration configuration) {
            LOG.info("--= Endpoint configuration was updated =--");
            displayConfiguration(configuration);

            Integer newSamplePeriod = configuration.getSamplingPeriod();
            if ((newSamplePeriod != null) && (newSamplePeriod > 0)) {
                changeMeasurementPeriod(kaaClient, newSamplePeriod);
            } else {
                LOG.warn("Sample period value (= {} in updated configuration is wrong, so ignore it.", newSamplePeriod);
            }
        }
    });
    NotificationTopicListListener topicListListener = new BasicNotificationTopicListListener();
    kaaClient.addTopicListListener(topicListListener);
    /*
     * Add a notification listener that listens to all notifications.
     */
    kaaClient.addNotificationListener(new NotificationListener() {
        @Override
        public void onNotification(long id, Notification notification) {
            LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
            LOG.info("Notification body: {} \n", notification.getMessage());
            String commands = (notification.getMessage());
            if (commands.equals("arm")) {
                System.out.println("The Status of Devices:" + StatusOfDevices);
            }
            inputTopicIdMessage();
        }
    });
    /*
     * Start the Kaa client and connect it to the Kaa server.
     */
    kaaClient.start();


    topics = kaaClient.getTopics();

        /*
         * List the obtained notification topics.
         */
    showTopics();

    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLong())

    {
        long topicId = scanner.nextLong();
        if (getTopic(topicId) != null) {
            LOG.info("Subscribing to optional topic {}", topicId);
            subscribeTopic(topicId);
        } else {
            LOG.info("There is no input topic id. Please, input existing topic id.");
        }
    }
    /*
     * Stop listening to the notification topic list updates.
     */
    kaaClient.removeTopicListListener(topicListListener);

    unsubscribeOptionalTopics();

    /*
     * Stop the Kaa client and release all the resources which were in use.
     */
    kaaClient.stop();
    LOG.info("Notification demo stopped");
}

private static void changeMeasurementPeriod(KaaClient kaaClient, Integer newPeriod) {
    if (executorHandle != null) {
        executorHandle.cancel(false);
    }
    samplePeriodInSeconds = newPeriod;
    executorHandle = executor.scheduleAtFixedRate(new MeasureSender(kaaClient), 0, samplePeriodInSeconds, TimeUnit.SECONDS);
    LOG.info("Set new sample period = {} seconds.", samplePeriodInSeconds);
}

private static class MeasureSender implements Runnable {
    KaaClient kaaClient;

    MeasureSender(KaaClient kaaClient) {
        this.kaaClient = kaaClient;
    }

    @Override
    public void run() {
        sentRecordsCount.incrementAndGet();
        DataLogging record = generateTopicId();
        RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
        LOG.info("Log record {} submitted for sending", record.toString());
        try {
            RecordInfo recordInfo = future.get(); // wait for log record delivery error
            BucketInfo bucketInfo = recordInfo.getBucketInfo();
            LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
                    bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
            confirmationsCount.incrementAndGet();
        } catch (Exception e) {
            LOG.error("Exception was caught while waiting for log's delivery report.", e);
        }
    }
}

private static DataLogging generateTopicId() {
    Integer topicId = generateTopicId().getTopicId();
    return new DataLogging(topicId, System.currentTimeMillis());
}

private static void inputTopicIdMessage() {
    LOG.info("\nPlease, type topic ID in order to subscribe to ones or type any text to exit: \n");
}

private static void displayConfiguration(org.kaaproject.kaa.schema.sample.Configuration configuration) {
    LOG.info("Configuration = {}", configuration.toString());
}

private static void showTopics() {
    if (topics == null || topics.isEmpty()) {
        LOG.info("Topic list is empty");
        return;
    }

    LOG.info("Available topics:");
    for (Topic topic : topics) {
        LOG.info("Topic id: {}, name: {}, type: {}", topic.getId(), topic.getName(), topic.getSubscriptionType());
    }

    LOG.info("Subscribed on topics:");
    for (Topic t : getOneTypeTopics(SubscriptionType.MANDATORY_SUBSCRIPTION)) {
        LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
    }
    /*
     * Optional topics
     */
    if (!subscribedTopics.isEmpty()) {
        for (Topic t : subscribedTopics) {
            LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
        }
    }
    inputTopicIdMessage();
}


private static List<Topic> getOneTypeTopics(SubscriptionType type) {
    List<Topic> res = new ArrayList<>();
    for (Topic t : NotificationDemo.topics) {
        if (t.getSubscriptionType() == type) {
            res.add(t);
        }
    }
    return res;
}

private static void subscribeTopic(long topicId) {
    try {
        subscribedTopics.add(getTopic(topicId));
        kaaClient.subscribeToTopic(topicId, true);
    } catch (UnavailableTopicException e) {
        e.printStackTrace();
    }
    inputTopicIdMessage();
}

private static Topic getTopic(long id) {
    for (Topic t : topics)
        if (t.getId() == id)
            return t;
    return null;
}

private static void sleepForSeconds(int seconds) {
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

private static void unsubscribeOptionalTopics() {
    List<Topic> topics = getOneTypeTopics(SubscriptionType.OPTIONAL_SUBSCRIPTION);

    for (Topic t : subscribedTopics) {
        try {
            kaaClient.unsubscribeFromTopic(t.getId());
        } catch (UnavailableTopicException e) {
            // if not subscribe
        }
    }
}


private static void waitForAnyInput() {
    try {
        System.in.read();
    } catch (IOException e) {
        LOG.warn("Error happens when waiting for user input.", e);
    }
}

/**
 * A listener that tracks the notification topic list updates
 * and subscribes the Kaa client to every new topic available.
 */
private static class BasicNotificationTopicListListener implements NotificationTopicListListener {
    @Override
    public void onListUpdated(List<Topic> list) {
        LOG.info("Topic list was updated:");
        topics.clear();
        topics.addAll(list);

        showTopics();
    }
}

}


person M. Asiyaban    schedule 02.12.2017    source источник
comment
Мне нужно больше информации, чтобы помочь вам. Какова ваша схема уведомлений? Что такое SecurityAlert? Что topicId вы ожидаете? Не могли бы вы опубликовать код вашего приложения? (Трудно понять весь поток из пары фрагментов.)   -  person Alexey Shmalko    schedule 05.12.2017
comment
Я изменил код. Я просто пытаюсь понять взаимосвязь между уведомлением и сбором данных (ведение журнала). 1. Схема уведомления: { тип: запись, имя: пример уведомления, пространство имен: org.kaaproject.kaa.schema.sample.notification, поля: [ { имя: сообщение, тип: строка } ] } 2. Об идентификаторе темы; Я хочу собрать данные. У вас есть хорошее предложение, как это сделать? 3. Я опубликовал код. (см. новое обновление.)   -  person M. Asiyaban    schedule 06.12.2017


Ответы (1)


Я постараюсь ответить, хотя я не совсем уверен. (Совершенно неясно, чего вы пытаетесь достичь.)

Прежде всего, я должен сказать, что регистрация данных и уведомления — совершенно ортогональные функции. То есть они никак не взаимодействуют.

В результате показывает, что "topicId" не получен. Потому что он равен 0.

Вероятно, это вызвано тем, что ваше приложение вообще не устанавливает topicId. (например, вызов new DataLogging() без аргументов.)

  1. О "topId"; Я хочу собрать данные. У вас есть хорошее предложение, как это сделать?

С точки зрения Каа, это topicId — просто целочисленное поле, и серверу все равно — он сохранит в монго все, что вы отправляете. Если ваша цель — собрать любые данные, просто поместите туда случайное целое число.

private static DataLogging generateTopicId() {
    Integer topicId = 42;
    return new DataLogging(topicId, System.currentTimeMillis());
}
person Alexey Shmalko    schedule 14.12.2017