flink SourceFunction ‹› заменяется в StreamExecutionEnvironment.addSource ()?

Я столкнулся с этой проблемой, когда пытался создать собственный источник события. Которая содержит очередь, которая позволяет другому моему процессу добавлять в нее элементы. Затем ожидайте, что мой шаблон CEP напечатает некоторые отладочные сообщения при совпадении.

Но что бы я ни добавлял в очередь, совпадений нет. Затем я замечаю, что очередь внутри mySource.run () всегда пуста. Это означает, что очередь, которую я использовал для создания экземпляра mySource, отличается от очереди внутри StreamExecutionEnvironment. Если я изменю очередь на статическую, заставлю все экземпляры использовать одну и ту же очередь, все будет работать, как ожидалось.

DummySource.java

    public class DummySource implements SourceFunction<String> {

    private static final long serialVersionUID = 3978123556403297086L;
//  private static Queue<String> queue = new LinkedBlockingQueue<String>();
    private Queue<String> queue;
    private boolean cancel = false;

    public void setQueue(Queue<String> q){
        queue = q;
    }   

    @Override
    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
            throws Exception {
        System.out.println("run");
        synchronized (queue) {          
            while (!cancel) {
                if (queue.peek() != null) {
                    String e = queue.poll();
                    if (e.equals("exit")) {
                        cancel();
                    }
                    System.out.println("collect "+e);
                    ctx.collectWithTimestamp(e, System.currentTimeMillis());
                }
            }
        }
    }

    @Override
    public void cancel() {
        System.out.println("canceled");
        cancel = true;
    }
}

Итак, я копаюсь в исходном коде StreamExecutionEnvironment. Внутри метода addSource (). Есть метод clean (), который выглядит так, будто заменяет экземпляр на новый.

Возвращает "очищенную от закрытия" версию данной функции.

Это почему? и почему это нужно сериализовать? Я также пытаюсь отключить чистое закрытие с помощью getConfig (). Результат все тот же. Мой экземпляр очереди не тот, который использует env.

Как мне решить эту проблему?


person Maxi Wu    schedule 14.08.2018    source источник


Ответы (1)


Метод clean(), используемый в функциях во Flink, в основном предназначен для обеспечения сериализации Function (например, SourceFunction, MapFunction). Flink будет сериализовать эти функции и распределить их по узлам задач для их выполнения.

Для простых переменных в основном коде Flink, таких как int, вы можете просто сослаться на них в своей функции. Но для больших или несериализуемых лучше использовать широковещательную передачу и функцию расширенного источника. См. https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables

person BrightFlow    schedule 14.08.2018
comment
моя программа flink не параллельна. Но широковещательная переменная решает мою проблему. Означает ли параллель, когда я использую stream.keyBy ()? чем каждый ключ будет иметь свой узел задачи? - person Maxi Wu; 14.08.2018
comment
Вероятно, parallel, что вы здесь упомянули, означает работу в кластере. Независимо от того, как вы запускаете задание Flink, в автономном режиме (выполняется на локальном компьютере с узлами Master и Task) или в кластерном режиме (Flink Cluster или YARN Cluster), Flink всегда сериализует functions, распределяет их и выполняет в Task. - person BrightFlow; 15.08.2018
comment
после повторного чтения документа похоже, что широковещательная передача используется с DataSet, и широковещательная переменная должна быть известна перед отправкой задачи на узел. Возможно, мне придется заглянуть в TaskManager для более сложного сценария. - person Maxi Wu; 23.08.2018