Я столкнулся с этой проблемой, когда пытался создать собственный источник события. Которая содержит очередь, которая позволяет другому моему процессу добавлять в нее элементы. Затем ожидайте, что мой шаблон CEP напечатает некоторые отладочные сообщения при совпадении.
Но что бы я ни добавлял в очередь, совпадений нет. Затем я замечаю, что очередь внутри mySource.run () всегда пуста. Это означает, что очередь, которую я использовал для создания экземпляра mySource, отличается от очереди внутри StreamExecutionEnvironment
. Если я изменю очередь на статическую, заставлю все экземпляры использовать одну и ту же очередь, все будет работать, как ожидалось.
DummySource.java
public class DummySource implements SourceFunction<String> {
private static final long serialVersionUID = 3978123556403297086L;
// private static Queue<String> queue = new LinkedBlockingQueue<String>();
private Queue<String> queue;
private boolean cancel = false;
public void setQueue(Queue<String> q){
queue = q;
}
@Override
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
throws Exception {
System.out.println("run");
synchronized (queue) {
while (!cancel) {
if (queue.peek() != null) {
String e = queue.poll();
if (e.equals("exit")) {
cancel();
}
System.out.println("collect "+e);
ctx.collectWithTimestamp(e, System.currentTimeMillis());
}
}
}
}
@Override
public void cancel() {
System.out.println("canceled");
cancel = true;
}
}
Итак, я копаюсь в исходном коде StreamExecutionEnvironment
. Внутри метода addSource (). Есть метод clean (), который выглядит так, будто заменяет экземпляр на новый.
Возвращает "очищенную от закрытия" версию данной функции.
Это почему? и почему это нужно сериализовать? Я также пытаюсь отключить чистое закрытие с помощью getConfig (). Результат все тот же. Мой экземпляр очереди не тот, который использует env.
Как мне решить эту проблему?