Контекст:-
Задумывались ли вы когда-нибудь, как балансировщик нагрузки отслеживает доступные узлы и распределяет нагрузку 🤔. Это довольно просто — просто вести список доступных узлов 😁, но у вас могут возникнуть такие вопросы, как…. что, если узел выйдет из строя и восстановится позже 🤔, как балансировщик нагрузки узнает, что этот узел жив и т. д..
Решение:-
мы можем просто использовать HeartBeat и сообщить серверу/балансировщику нагрузки, что вы живы и готовы обслуживать трафик. Это похоже на то, как бьется наше сердце ❤️ и сигнализирует о том, что вы живы 😁.
HeartBeat :-
Вы должны регулярно сообщать серверу, что вы ЖИВЫ.
func (n *Node) SayYouAreAlive(ctx context.Context) {
defer n.wg.Готово()
// отправляем сигнал случайным образом
delay := []время.Duration{время.Second * 4, time.Second * 6 , время.Секунда * 8, время.Секунда * 11}
for {
select {
case ‹-ctx.Готово():
вернуть
case ‹-время.После(delay[rand. Intn(len(delay))]):
n.ServerCommunicationChan‹- n.Имя
}
}
Node Keeper:-
обновляет последнюю отметку времени сердцебиения и отслеживает доступные узлы. (узлы могут отправлять сердцебиение после восстановления, в этом случае он снова добавит узел в текущий список )
// Keeper обновляет последние тактовые импульсы от узлов
func (s *Server) Keeper(ctx context.Context) {
defer s.wg.Done()
for {
select {
case ‹-ctx.Done():
return
case x := ‹-s.ClientCommunicationChan:
_, ok := s.clients[x]< br /> if !ok {
fmt.Println («узел восстановлен», x, «в», time.Now())
s.clients[x] = Client{HeartBeat: HearBeat{Name : x, LastHeartBeatTime: time.Now()}}
} else {
fmt.Println("получено обновление от узла", x, "в", time.Now())
s.clients[x] = Client{HeartBeat: HearBeat{Name: x, LastHeartBeatTime: time.Now()}}
}
}
}
}
Удалить недоступные узлы:-
он продолжает проверять последний пульс узла, если разница временных меток больше, чем заданная конфигурация, тогда он будет рассматривать его как мертвый узел и удалить его из текущего списка.
func (s *Server) CheckForUnavailableClients(ctx context.Context) {
defer s.wg.Done()
for {
select {
case ‹-ctx.Done() :
return
case ‹-time.After(time.Second * 2):
for _, v := range s.clients {
diff := time.Now( ).Sub(v.HeartBeat.LastHeartBeatTime)
if diff › (time.Second * 5) {
fmt.Println("node", v.HeartBeat.Name, " недоступен | последний доступный в “, v.HeartBeat.LastHeartBeatTime)
delete(s.clients, v.HeartBeat.Name)
}
}
}
}
}
Я написал полный код ниже с комментариями. Пожалуйста, просмотрите его и дайте мне знать, если возникнут какие-либо ошибки/предложения/сомнения и т. д.…👍
package main import ( "context" "fmt" "math/rand" "strings" "sync" "time" ) const ( bufferSize = 10 ) type Client struct { HeartBeat HearBeat } type HearBeat struct { Name string LastHeartBeatTime time.Time } type Server struct { RegisterCli chan string clients map[string]Client // to communicate with nodes ClientCommunicationChan chan string wg *sync.WaitGroup } type Node struct { Name string // to communicate with server ServerCommunicationChan chan string wg *sync.WaitGroup } func main() { // ctx with timeout for server ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() // registration channel to register nodes/clients reg := make(chan string, bufferSize) // initialize new server instance svc := NewServer(ctx, reg) //new ctx with timeout for node ctxCLiOne, ctxCLiOneCancel := context.WithTimeout(context.Background(), time.Second*24) defer ctxCLiOneCancel() // register a node n1 := NewClient(ctxCLiOne, svc) n1.wg.Add(1) // push heart beat signal to server go n1.SayYouAreAlive(ctxCLiOne) go func() { n1.wg.Wait() }() ctxCLiTwo, ctxCLiTwoCancel := context.WithTimeout(context.Background(), time.Second*24) defer ctxCLiTwoCancel() // register a node n2 := NewClient(ctxCLiTwo, svc) n2.wg.Add(1) // push heart beat signal to server go n2.SayYouAreAlive(ctxCLiTwo) go func() { n2.wg.Wait() }() // to hold the main function time.Sleep(time.Second * 30) } func NewClient(ctx context.Context, svc *Server) *Node { var wg sync.WaitGroup nodeName := "node-" + randName() n := &Node{ ServerCommunicationChan: svc.ClientCommunicationChan, wg: &wg, Name: nodeName, } fmt.Println("new node is generating ", nodeName, "at ", time.Now()) // registers new node in server svc.RegisterCli <- nodeName return n } func NewServer(ctx context.Context, reg chan string) *Server { var wg sync.WaitGroup wg.Add(4) go func() { wg.Wait() }() svc := &Server{ ClientCommunicationChan: make(chan string, bufferSize), clients: make(map[string]Client, bufferSize), wg: &wg, RegisterCli: make(chan string, bufferSize), } go svc.RegisterNode(ctx, reg) go svc.CheckForUnavailableClients(ctx) go svc.Keeper(ctx) go svc.UpdateHeartBeat(ctx) return svc } // RegisterNode Registers new node func (s *Server) RegisterNode(ctx context.Context, reg chan string) { defer s.wg.Done() for { select { case <-ctx.Done(): return case x := <-reg: cli := HearBeat{Name: x, LastHeartBeatTime: time.Now()} fmt.Println("new node registered ", x, "at", time.Now()) s.clients[x] = Client{HeartBeat: cli} } } } func (n *Node) SayYouAreAlive(ctx context.Context) { defer n.wg.Done() // push signal randomly delay := []time.Duration{time.Second * 4, time.Second * 6, time.Second * 8, time.Second * 11} for { select { case <-ctx.Done(): return case <-time.After(delay[rand.Intn(len(delay))]): fmt.Println("sending heart beat ", n.Name, "at", time.Now()) n.ServerCommunicationChan <- n.Name } } } // UpdateHeartBeat acts as a bridge between server and node and informs server that node is alive func (s *Server) UpdateHeartBeat(ctx context.Context) { defer s.wg.Done() for { select { case <-ctx.Done(): return case x := <-s.ClientCommunicationChan: fmt.Println("updating heart beat ", x, "at", time.Now()) s.ClientCommunicationChan <- x } } } // Keeper updates the latest heart beat from nodes func (s *Server) Keeper(ctx context.Context) { defer s.wg.Done() for { select { case <-ctx.Done(): return case x := <-s.ClientCommunicationChan: _, ok := s.clients[x] if !ok { fmt.Println("node recovered ", x, "at", time.Now()) s.clients[x] = Client{HeartBeat: HearBeat{Name: x, LastHeartBeatTime: time.Now()}} } else { fmt.Println("got update from node ", x, "at", time.Now()) s.clients[x] = Client{HeartBeat: HearBeat{Name: x, LastHeartBeatTime: time.Now()}} } } } } // CheckForUnavailableClients removes the unavailable nodes func (s *Server) CheckForUnavailableClients(ctx context.Context) { defer s.wg.Done() for { select { case <-ctx.Done(): return case <-time.After(time.Second * 2): for _, v := range s.clients { diff := time.Now().Sub(v.HeartBeat.LastHeartBeatTime) if diff > (time.Second * 5) { fmt.Println("node ", v.HeartBeat.Name, " is unavailable | last available at ", v.HeartBeat.LastHeartBeatTime) delete(s.clients, v.HeartBeat.Name) } } } } } func randName() string { rand.Seed(time.Now().UnixNano()) smallLetters := "abcdefghijklmnopqrstuvwxyz" CapLetters := strings.ToUpper(smallLetters) digits := "1234567890" return fmt.Sprintf("%s%s%s%s%s", string(smallLetters[rand.Intn(26)]), string(CapLetters[rand.Intn(26)]), string(digits[rand.Intn(10)]), string(smallLetters[rand.Intn(26)]), string(CapLetters[rand.Intn(26)])) } sample output:- new node is generating node-sF0kD at 2023-02-02 20:10:20.695799 +0530 IST m=+0.000341921 new node is generating node-nC7wC at 2023-02-02 20:10:20.696029 +0530 IST m=+0.000572149 sending heart beat node-sF0kD at 2023-02-02 20:10:31.696547 +0530 IST m=+11.000759940 node recovered node-sF0kD at 2023-02-02 20:10:31.696647 +0530 IST m=+11.000860512 sending heart beat node-nC7wC at 2023-02-02 20:10:31.696492 +0530 IST m=+11.000705042 updating heart beat node-nC7wC at 2023-02-02 20:10:31.696713 +0530 IST m=+11.000926418 node recovered node-nC7wC at 2023-02-02 20:10:31.696752 +0530 IST m=+11.000965150 node node-sF0kD is unavailable | last available at 2023-02-02 20:10:31.69666 +0530 IST m=+11.000872839 node node-nC7wC is unavailable | last available at 2023-02-02 20:10:31.696761 +0530 IST m=+11.000974564 sending heart beat node-sF0kD at 2023-02-02 20:10:37.696879 +0530 IST m=+17.000912439 updating heart beat node-sF0kD at 2023-02-02 20:10:37.696922 +0530 IST m=+17.000955570 node recovered node-sF0kD at 2023-02-02 20:10:37.696931 +0530 IST m=+17.000964439 sending heart beat node-sF0kD at 2023-02-02 20:10:41.69816 +0530 IST m=+21.002073412 updating heart beat node-sF0kD at 2023-02-02 20:10:41.698296 +0530 IST m=+21.002208862 got update from node node-sF0kD at 2023-02-02 20:10:41.698328 +0530 IST m=+21.002241449 sending heart beat node-nC7wC at 2023-02-02 20:10:42.697381 +0530 IST m=+22.001264105 updating heart beat node-nC7wC at 2023-02-02 20:10:42.697432 +0530 IST m=+22.001315581 node recovered node-nC7wC at 2023-02-02 20:10:42.697469 +0530 IST m=+22.001353523 node node-sF0kD is unavailable | last available at 2023-02-02 20:10:41.698344 +0530 IST m=+21.002256878 node node-nC7wC is unavailable | last available at 2023-02-02 20:10:42.697478 +0530 IST m=+22.001360697