Декодируйте схему событий debeuzium в значимую структуру данных в golang

Схема Debezium представляет собой строку (json):

{
   "type":"record",
   "name":"Envelope",
   "namespace":"datapipe.inventory.customers",
   "fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"id",
                     "type":"int"
                  },
                  {
                     "name":"first_name",
                     "type":"string"
                  },
                  {
                     "name":"last_name",
                     "type":"string"
                  },
                  {
                     "name":"email",
                     "type":"string"
                  }
               ],
               "connect.name":"datapipe.inventory.customers.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
      {
         "name":"source",
         "type":{
            "type":"record",
            "name":"Source",
            "namespace":"io.debezium.connector.mysql",
            "fields":[
               {
                  "name":"version",
                  "type":"string"
               },
               {
                  "name":"connector",
                  "type":"string"
               },
               {
                  "name":"name",
                  "type":"string"
               },
               {
                  "name":"ts_ms",
                  "type":"long"
               },
               {
                  "name":"snapshot",
                  "type":[
                     {
                        "type":"string",
                        "connect.version":1,
                        "connect.parameters":{
                           "allowed":"true,last,false"
                        },
                        "connect.default":"false",
                        "connect.name":"io.debezium.data.Enum"
                     },
                     "null"
                  ],
                  "default":"false"
               },
               {
                  "name":"db",
                  "type":"string"
               },
               {
                  "name":"table",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"server_id",
                  "type":"long"
               },
               {
                  "name":"gtid",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"file",
                  "type":"string"
               },
               {
                  "name":"pos",
                  "type":"long"
               },
               {
                  "name":"row",
                  "type":"int"
               },
               {
                  "name":"thread",
                  "type":[
                     "null",
                     "long"
                  ],
                  "default":null
               },
               {
                  "name":"query",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               }
            ],
            "connect.name":"io.debezium.connector.mysql.Source"
         }
      },
      {
         "name":"op",
         "type":"string"
      },
      {
         "name":"ts_ms",
         "type":[
            "null",
            "long"
         ],
         "default":null
      },
      {
         "name":"transaction",
         "type":[
            "null",
            {
               "type":"record",
               "name":"ConnectDefault",
               "namespace":"io.confluent.connect.avro",
               "fields":[
                  {
                     "name":"id",
                     "type":"string"
                  },
                  {
                     "name":"total_order",
                     "type":"long"
                  },
                  {
                     "name":"data_collection_order",
                     "type":"long"
                  }
               ]
            }
         ],
         "default":null
      }
   ],
   "connect.name":"datapipe.inventory.customers.Envelope"
}

Я хочу декодировать эту схему в осмысленную структуру данных, чтобы я мог выполнять операции с этими данными.

Преобразование с использованием карты и цикла for происходит медленно и требует большого количества манипуляций, как показано ниже.

for k1, v2 := range m["fields"].([]interface{}) {
        fmt.Println(k1)
        v3 := v2.(map[string]interface{})
        for _, value := range v3 {
            if value == "before" {
                vtype := v3["type"].([]interface{})

                for kk, vtyp := range vtype {
                    fmt.Printf("kk %v, vtyp %v\n", kk, vtyp)
                    // for k4, v4 := range vtyp.([]interface{}) {
                    //  fmt.Printf("key4: %v, value4: %v\n", k4, v4)
                    // }

                }


            }
        }
    }

Есть ли тип Debezium на Голанге? Пожалуйста, предложите. Я проверил документацию, но создание типа с таким количеством гнезд очень запутанно.

Связанные https://github.com/riferrei/srclient/issues/13


person Alok Kumar Singh    schedule 24.08.2020    source источник
comment
Похоже на json, объявите свой собственный тип структуры Debezium и распараллелите его.   -  person mkopriva    schedule 24.08.2020
comment
Доступен ли тип Debezium на Golang? Нет, не на языке и не в стандартной библиотеке. Вы можете найти стороннюю реализацию, если у вас есть проблемы с написанием собственного типа структуры. Также есть инструмент, который сгенерирует тип структуры Go из json для вас, используйте поисковую систему или подождите немного, кто-то обязательно это увидит и опубликует ссылку в комментарии.   -  person mkopriva    schedule 24.08.2020
comment
Построение типа структуры debezium было для меня очень запутанным, так как у него было много вложенных типов.   -  person Alok Kumar Singh    schedule 24.08.2020
comment
Нашел этот mholt.github.io/json-to-go при попытке   -  person Alok Kumar Singh    schedule 24.08.2020
comment
Пытался использовать сгенерированную структуру, не помогло, получил эту ошибку panic: json: cannot unmarshal object into Go struct field Fields.fields.type of type []interface {} play.golang.org/p/RCIvR0wGjvZ< /а>   -  person Alok Kumar Singh    schedule 24.08.2020
comment
Автор: Ризван play.golang.org/p/BZkbWFDrqps работает! json-to-go не заботится о вложенной структуре.   -  person Alok Kumar Singh    schedule 24.08.2020
comment
еще один rk play.golang.org/p/TXqhcBVa7sq   -  person Alok Kumar Singh    schedule 25.08.2020
comment
один со всеми картами play.golang.org/p/6jSG4PFTEBv   -  person Alok Kumar Singh    schedule 25.08.2020


Ответы (1)


package main

import (
    "fmt"
    "encoding/json"
)

type DebeziumSchema struct {
    Type        string         `json:"type"`
    Name        string         `json:"name"`
    Namespace   string         `json:"namespace"`
    Fields      []SchemaField  `json:"fields"`
    ConnectName string         `json:"connect.name"`
}

type SchemaField struct {
    Name    string          `json:"name"`
    Type    interface{}     `json:"type"`
    Default interface{}     `json:"default,omitempty"`
}

func main() {
    schema := `{"type":"record","name":"Envelope","namespace":"datapipe.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"}],"connect.name":"datapipe.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"datapipe.inventory.customers.Envelope"}`

    var d DebeziumSchema

    err := json.Unmarshal([]byte(schema), &d)
    if err != nil {
        panic(err)
    }

    columns := make(map[string]string)
    
    for _, field := range d.Fields {
        if field.Name != "before" {
            continue
        }
        
        for _, k := range field.Type.([]interface{}) {
            switch k.(type) {
            case map[string]interface{}:
                rk := k.(map[string]interface{})
                for rk1, rk2 := range rk {
                    if rk1 != "fields" {
                        continue
                    }
                    for _, kkl := range rk2.([]interface{}) {
                        k22 := kkl.(map[string]interface{})
                        columns[k22["name"].(string)] = k22["type"].(string)
                    }
                }
                
            }
        }
        
    }
    
    fmt.Println(columns)

}
map[email:string first_name:string id:int last_name:string]

Я смог извлечь столбцы, сделав это. Но я уверен, что это можно сделать лучше.

https://play.golang.org/p/e0-F4Un4Mzv

person Alok Kumar Singh    schedule 24.08.2020