Как обрабатывать буферизованные потоки чтения-записи для одноранговых узлов в golang с помощью libp2p?

Я следую этому руководству:

https://github.com/libp2p/go-libp2p-examples/tree/master/chat-with-mdns

В краткой форме это:

  1. настраивает p2p-хост
  2. устанавливает функцию обработчика по умолчанию для входящих соединений (3. не обязательно)
  3. и открывает поток для подключающихся пиров:

stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

После этого создается переменная буферного потока/чтения-записи:

rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

Теперь этот поток используется для отправки и получения данных между узлами. Это делается с помощью двух функций goroutine, которые имеют rw в качестве входных данных:

go writeData(rw) go readData(rw)

Мои проблемы:

  1. Я хочу отправить данные своим коллегам и мне нужна от них обратная связь: например. в rw есть вопрос и на него нужно ответить да/нет. Как я могу вернуть этот ответ и обработать его (включить какое-то взаимодействие)?

  2. Данные, которые я хочу отправить в rw, не всегда одинаковы. Иногда это строка, содержащая только имя, иногда строка, содержащая целый блок и т. д. Как отличить?

Я думал об этих решениях. Но я новичок в golang, так что, возможно, у вас есть лучший:

  • мне нужен новый поток для каждого другого контента: stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

  • мне нужно открыть больше буферизованных переменных rw для каждого другого контента: rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

  • есть ли другие решения?

Спасибо за любую помощь, чтобы решить эту проблему!


person Chris    schedule 25.03.2019    source источник


Ответы (1)


Вот что readData делает из вашего туто:

func readData(rw *bufio.ReadWriter) {
    for {
        str, err := rw.ReadString('\n')
        if err != nil {
            fmt.Println("Error reading from buffer")
            panic(err)
        }

        if str == "" {
            return
        }
        if str != "\n" {
            // Green console colour:    \x1b[32m
            // Reset console colour:    \x1b[0m
            fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
        }

    }
}

Он в основном читает поток, пока не найдет \n, который является символом новой строки, и выведет его на стандартный вывод.

writeData:

func writeData(rw *bufio.ReadWriter) {
    stdReader := bufio.NewReader(os.Stdin)

    for {
        fmt.Print("> ")
        sendData, err := stdReader.ReadString('\n')
        if err != nil {
            fmt.Println("Error reading from stdin")
            panic(err)
        }

        _, err = rw.WriteString(fmt.Sprintf("%s\n", sendData))
        if err != nil {
            fmt.Println("Error writing to buffer")
            panic(err)
        }
        err = rw.Flush()
        if err != nil {
            fmt.Println("Error flushing buffer")
            panic(err)
        }
    }
}

Он считывает данные со стандартного ввода, чтобы вы могли вводить сообщения, записывает их в rw и сбрасывает. Этот вид позволяет своего рода чат tty. Если он работает правильно, вы сможете запустить как минимум двух одноранговых узлов и общаться через стандартный ввод.

Вы не должны воссоздавать новый rw для нового контента. Вы можете повторно использовать существующий, пока не закроете его. Из кода туто для каждого нового пира создается новый rw.


Теперь поток tcp не работает как http-запрос с запросом и ответом, соответствующим этому запросу. Итак, если вы хотите что-то отправить и получить ответ на этот конкретный вопрос, вы можете отправить сообщение такого формата:

[8 bytes unique ID][content of the message]\n

И когда вы его получаете, вы анализируете его, готовите ответ и отправляете его в том же формате, чтобы вы могли сопоставлять сообщения, создавая своего рода связь типа запрос/ответ.

Вы можете сделать что-то вроде этого:

func sendMsg(rw *bufio.ReadWriter, id int64, content []byte) error {
        // allocate our slice of bytes with the correct size 4 + size of the message + 1
        msg := make([]byte, 4 + len(content) + 1)

        // write id 
        binary.LittleEndian.PutUint64(msg, uint64(id))

        // add content to msg
        copy(msg[13:], content)

        // add new line at the end
        msg[len(msg)-1] = '\n'

        // write msg to stream
        _, err = rw.Write(msg)
        if err != nil {
            fmt.Println("Error writing to buffer")
            return err
        }
        err = rw.Flush()
        if err != nil {
            fmt.Println("Error flushing buffer")
            return err
        }
        return nil
}

func readMsg(rw *bufio.ReadWriter) {
    for {
        // read bytes until new line
        msg, err := rw.ReadBytes('\n')
        if err != nil {
            fmt.Println("Error reading from buffer")
            continue
        }

        // get the id
        id := int64(binary.LittleEndian.Uint64(msg[0:8]))

        // get the content, last index is len(msg)-1 to remove the new line char
        content := string(msg[8:len(msg)-1])

        if content != "" {
            // we print [message ID] content
            fmt.Printf("[%d] %s", id, content)
        }

        // here you could parse your message
        // and prepare a response
        response, err := prepareResponse(content)
        if err != nil {
            fmt.Println("Err while preparing response: ", err)
            continue
        }

        if err := s.sendMsg(rw, id, response); err != nil {
            fmt.Println("Err while sending response: ", err)
            continue
        }
    }
}

Надеюсь это поможет.

person Francois    schedule 25.03.2019