Отслеживание Hazelcast через сыщика

Интересно, есть ли какая-то интеграция сыщика в hazelcast. В моем приложении у меня есть очередь hazelcast с прослушивателями событий, настроенными для событий addEntity, и проблема в том, что диапазон кажется нарушенным после срабатывания этого прослушивателя. Я знаю, что есть интеграция сыщика для ExecutorService, но есть ли что-то подобное для com.hazelcast.core.ItemListener? Заранее спасибо.

UPD: Подробности. У меня есть пример службы, которая использует очередь spring-cloud-sleth и hazelcast.

package com.myapp;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.sleuth.DefaultSpanNamer;
import org.springframework.cloud.sleuth.TraceRunnable;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class SomeService {

private HazelcastInstance hazelcastInstance =
    Hazelcast.newHazelcastInstance();
private IQueue<String> queue = hazelcastInstance.getQueue("someQueue");

private Tracer tracing;


@Autowired(required = false)
public void setTracer(Tracer tracer) {
    this.tracing = tracer;
}


{
    queue.addItemListener(new ItemListener<String>() {
    @Override
    public void itemAdded(ItemEvent<String> item) {
        log.info("This is span");
        log.info("This is item " + item);
    }

    @Override
    public void itemRemoved(ItemEvent<String> item) {
    }
    }, true);
}

@Async
public void processRequestAsync() {
    log.info("Processing async");
    log.info("This is span");
    Executors.newSingleThreadExecutor().execute(
        new TraceRunnable(tracing, new DefaultSpanNamer(), () -> log.info("Some Weird stuff")));
    queue.add("some stuff");
}

}

и как только я вызываю processRequestAsync, я получаю следующий вывод в консоли:

INFO [-,792a6c3ad3e91280,792a6c3ad3e91280,false] 9996 --- [nio-8080-exec-2] com.myapp.SomeController            : Incoming request!
INFO [-,792a6c3ad3e91280,792a6c3ad3e91280,false] 9996 --- [nio-8080-exec-2] com.myapp.SomeController            : This is current span [Trace: 792a6c3ad3e91280, Span: 792a6c3ad3e91280, Parent: null, exportable:false]
INFO [-,792a6c3ad3e91280,7d0c06d3e24a7ba1,false] 9996 --- [cTaskExecutor-1] com.myapp.SomeService               : Processing async
INFO [-,792a6c3ad3e91280,7d0c06d3e24a7ba1,false] 9996 --- [cTaskExecutor-1] com.myapp.SomeService               : This is span
INFO [-,792a6c3ad3e91280,8a2f0a9028f44979,false] 9996 --- [pool-1-thread-1] com.myapp.SomeService               : Some Weird stuff
INFO [-,792a6c3ad3e91280,7d0c06d3e24a7ba1,false] 9996 --- [cTaskExecutor-1] c.h.i.p.impl.PartitionStateManager       : [10.236.31.22]:5701 [dev] [3.8.3] Initializing cluster partition table arrangement...
INFO [-,,,] 9996 --- [e_1_dev.event-4] com.myapp.SomeService               : This is span
INFO [-,,,] 9996 --- [e_1_dev.event-4] com.myapp.SomeService               : This is item ItemEvent{event=ADDED, item=some stuff, member=Member [10.236.31.22]:5701 - b830dbf0-0977-42a3-a15d-800872221c84 this} 

Итак, похоже, что диапазон был нарушен, когда мы перейдем к коду eventListener, и мне интересно, как я могу распространять или создавать новый диапазон внутри очереди hazelcast


person Oleg K.    schedule 25.07.2018    source источник
comment
Не то, что я знаю о   -  person Marcin Grzejszczak    schedule 25.07.2018
comment
Не могли бы вы рассказать подробнее о sleuth, о котором вы упомянули? Какую функциональность вы ищете?   -  person Alparslan Avci    schedule 25.07.2018


Ответы (2)


Sleuth (на момент написания статьи) не поддерживает Hazelcast.

Решение более общее, чем просто Hazelcast: вам нужно передать brave.Span Zipkin между клиентом и сервером, но brave.Span не является сериализуемым.

Zipkin предоставляет средства, с помощью которых можно обойти это.

Учитывая brave.Span на клиенте, вы можете преобразовать его в java.util.Map:

Span span = ...
Map<String, String> map = new HashMap<>();

tracing.propagation().injector(Map<String, String>::put).inject(span.context(), map);

На сервере вы можете преобразовать java.util.Map обратно в brave.Span:

Span span = tracer.toSpan(tracing.propagation().extractor(Map<String, String>::get).extract(map).context())

Использование java.util.Map, очевидно, может быть заменено по мере необходимости, но принцип тот же.

person Nick Holt    schedule 28.02.2020

Я не могу заставить его работать для ItemListeners. Я думаю, нам нужно иметь возможность обернуть StripedExecutor Hazelcast во что-то вроде LazyTraceThreadPoolTaskExecutor (но такое, которое принимает простой делегат Executor вместо ThreadPoolTaskExecutor).

Для EntryProcessors я взломал это вместе. Фабрика для создания EntryProcessors, передавая текущий диапазон из потока, который создает процессор. Когда процессор работает, он использует этот диапазон в качестве родительского диапазона в потоке исполнителя.

@Component
public class SleuthedEntryProcessorFactory {

    private final Tracer tracer;

    public SleuthedEntryProcessorFactory(Tracer tracer) {
        this.tracer = tracer;
    }

    /**
     * Create an entry processor that will continue the Sleuth span of the thread 
     * that invokes this method.
     * Mutate the given value as required. It will then be set on the entry.
     *
     * @param name name of the span
     * @param task task to perform on the map entry
     */
    public <K, V, R> SleuthedEntryProcessor<K, V, R> create(String name, Function<V, R> task) {
        return new SleuthedEntryProcessor<>(name, tracer.getCurrentSpan(), task);
    }
}

/**
 * Copies the MDC context (which contains Sleuth's trace ID, etc.) and the current span
 * from the thread that constructs this into the thread that runs this.

 * @param <K> key type
 * @param <V> value type
 * @param <R> return type
 */
@SpringAware
public class SleuthedEntryProcessor<K, V, R> extends AbstractEntryProcessor<K, V> {

    private final Map<String, String> copyOfContextMap;
    private final String name;
    private final Span parentSpan;
    private final Function<V, R> task;
    private transient Tracer tracer;

    public SleuthedEntryProcessor(String name, Span parentSpan, Function<V, R> task) {
        this(name, parentSpan, task, true);
    }

    public SleuthedEntryProcessor(
            String name, Span parentSpan, Function<V, R> task, boolean applyOnBackup) {
        super(applyOnBackup);
        this.name = name + "Hz";
        this.parentSpan = parentSpan;
        this.task = task;
        copyOfContextMap = MDC.getCopyOfContextMap();
    }

    @Override
    public final R process(Map.Entry<K, V> entry) {
        if (nonNull(copyOfContextMap)) {
            MDC.setContextMap(copyOfContextMap);
        }

        Span span = tracer.createSpan(toLowerHyphen(name), parentSpan);
        try {
            V value = entry.getValue();
            // The task mutates the value.
            R result = task.apply(value);
            // Set the mutated value back onto the entry.
            entry.setValue(value);
            return result;

        } finally {
            MDC.clear();
            tracer.close(span);
        }
    }

    @Autowired
    public void setTracer(Tracer tracer) {
        this.tracer = tracer;
    }
}

Затем передайте EntryProcessor в свой IMap следующим образом:

Function<V, R> process = ...;
SleuthedEntryProcessor<K, V, R> entryProcessor = sleuthedEntryProcessorFactory.create(label, process);
Map<K, R> results = iMap.executeOnEntries(entryProcessor);
person whistling_marmot    schedule 28.08.2018