de Anshul Sanghi

Cum să gestionați abonamentele GraphQL cu Go, GQLgen și MongoDB

Crearea unui server de date în timp real cu abonamente GraphQL și ChangeStreams MongoDB

Cum sa gestionati abonamentele GraphQL cu Go GQLgen si MongoDB

Dacă ați folosit GQLgen în trecut, știți că într-adevăr acceptă modele de abonament, dar implementarea pe care o utilizează nu funcționează exact cu MongoDB în mod corespunzător.

Pentru aceia dintre voi care nu au auzit sau nu au folosit încă GQLgen, este un pachet go care generează în mod esențial cod boilerplate din schemele dvs. GraphQL și vă oferă funcționalități suplimentare, cum ar fi configurarea unui server GraphQL etc. Vom folosi acest lucru pe scară largă pentru configurarea noastră GraphQL, așa că vă sugerez să aruncați o privire înainte de a continua, deoarece nu o voi acoperi prea mult aici. Un bun punct de plecare ar fi acest.

Vom construi un API care se ocupă de crearea / interogarea / actualizarea unui utilizator și care ascultă când un utilizator are o nouă notificare printr-un abonament.

A trebuit să fac câteva modificări la codul meu, precum și la codul generat de GQLgen pentru a funcționa corect, dar nu sunt foarte sigur dacă acesta este cel mai bun mod de a merge dintr-o perspectivă de performanță și mi-ar plăcea să am sugestii. De asemenea, acest lucru nu va acoperi totul în detaliu, cu excepția pieselor necesare, deoarece postul este deja suficient de lung ca atare.

Înființat

Să configurăm un proiect de pornire înainte de a ne arunca cu capul în cod. Creați un proiect nou în GOPATH și creați un pachet db în cadrul ei. Acest director va conține tot codul legat de baza de date în sine (în acest caz, MongoDB).

Apoi, instalați următoarele pachete necesare:

go get github.com/99designs/gqlgengo get github.com/gorilla/muxgo get github.com/globalsign/mgo

Următoarele pachete trebuie instalate și sunt utilizate numai de GQLgen intern. Nu vom lucra direct cu acestea, dar sunt necesare:

go get github.com/pkg/errorsgo get github.com/urfave/cligo get golang.org/x/tools/go/ast/astutilgo get golang.org/x/tools/go/loadergo get golang.org/x/tools/importsgo get gopkg.in/yaml.v2

Suntem gata să începem să scriem un cod 🙂

Configurare proiect

Voi folosi globalsign/mgo pachet pentru Golang ca driverul meu MongoDB, care este în esență o versiune de labix / mgo.v2 întreținută de comunitate. Verifică aici.

În directorul dvs. db, creați un fișier setup.go cu următorul cod:

package dbimport (   "fmt"   "github.com/globalsign/mgo")
var session *mgo.Sessionvar db *mgo.Database
func ConnectDB() {   session, err := mgo.Dial("mongodb://localhost:27017,localhost:27018")
   if err != nil {      fmt.Println(err)   }   session.SetMode(mgo.Monotonic, true)   db = session.DB("subscriptionstest")}
func GetCollection(collection string) *mgo.Collection {   return db.C(collection)}
func CloseSession() {   session.Close()}

Am deja un set de replici configurat pe porturi 27017 și 27018. Vă sugerez să faceți același lucru în acest moment înainte de a continua. Apoi, creați un scripts din proiectul dvs. și creați un fișier nou gqlgen.go cu următorul conținut:

// +build ignorepackage mainimport "github.com/99designs/gqlgen/cmd"func main() {   cmd.Execute()}

Acest lucru este necesar doar pentru a rula generatorul și așa că îl vom exclude din construcția noastră.

Acum, să creăm un pachet nou users și creați un fișier schema.graphql în acesta cu următorul cod:

schema {    query: Query    mutation: Mutation}type Query {    user(id: ID!): User!}
type Mutation {    createUser(input: NewUser!): User!    updateUser(input: UpdateUser!): User!    updateNotification(input: UpdateNotification): User!}
type User {    id: ID!    first: String!    last: String!    email: String!    notifications: [Notification!]!}
type Notification {    id: ID!    seen: Boolean!    text: String!    title: String!}
input NewUser {    email: String!}input UpdateUser {    id: ID!    first: String    last: String    email: String}input UpdateNotification {    id: ID!    userID: ID!    seen: Boolean!}
type Subscription {    notificationAdded(id: ID!): User!}

Acum, navigați la folderul utilizatorilor din linia de comandă și rulați

go run ../scripts/gqlgen.go init

Aceasta va genera 4 fișiere și anume resolver.go generated.go models_gen.go gqlgen.yml. De asemenea, va crea un folder numit server în pachetul de utilizatori, care va conține codul pentru rularea serverului GraphQL. Puteți elimina acest lucru, deoarece vom avea propriul nostru server la rădăcina proiectului, ceea ce ne-ar permite, de asemenea, să avem în cele din urmă mai multe puncte finale GraphQL servite de la un singur server.

Inițial, vom lucra numai cu resolver.go care deține practic logica pentru diverse interogări și mutații definite în fișierul nostru de schemă. Dar mai întâi, trebuie să mergem la models_gen.go fișier și adăugați fișierul bson:"_id" etichetați câmpul nostru ID din structura utilizatorului, astfel încât să putem obține id-ul din baza de date în această structură

type User struct {   ID            string         `json:"id" bson:"_id"`   First         string         `json:"first"`   Last          string         `json:"last"`   Email         string         `json:"email"`   Notifications []Notification `json:"notifications"`}

Acum, să configurăm rapid rezolvatoarele de bază, fără a intra în prea multe detalii. Veți observa că, în partea de sus a fișierului, veți vedea un cod similar cu acesta:

type Resolver struct{}func (r *Resolver) Mutation() MutationResolver {   return &mutationResolver{r}}func (r *Resolver) Query() QueryResolver {   return &queryResolver{r}}func (r *Resolver) Subscription() SubscriptionResolver {   return &subscriptionResolver{r}}

O vom înlocui cu aceasta:

type Resolver struct {   users *mgo.Collection}func New() Config {   return Config{      Resolvers: &Resolver{         users: db.GetCollection("users"),      },   }}func (r *Resolver) Mutation() MutationResolver {   r.users = db.GetCollection("users")   return &mutationResolver{r}}func (r *Resolver) Query() QueryResolver {   r.users = db.GetCollection("users")   return &queryResolver{r}}func (r *Resolver) Subscription() SubscriptionResolver {   r.users = db.GetCollection("users")   return &subscriptionResolver{r}}

Facem acest lucru, astfel încât să putem avea o referință la colecția noastră direct în structura resolver, ceea ce ne-ar face mai ușor să lucrăm cu colecția de-a lungul rezolvărilor. Voi explica semnificația New funcționează mai târziu când avem nevoie de el.

Haideți să configurăm rapid rezolvatorii noștri de bază.

CreateUser Resolver

func (r *mutationResolver) CreateUser(ctx context.Context, input NewUser) (User, error) {   var user User   count, err := r.users.Find(bson.M{"email": input.Email}).Count()   if err != nil {      return User{}, err   } else if count > 0 {      return User{}, errors.New("user with that email already exists")   }   err = r.users.Insert(bson.M{"email": input.Email,})   if err != nil {      return User{}, err   }   err = r.users.Find(bson.M{"email": input.Email}).One(&user)   if err != nil {      return User{}, err   }   return user, nil}

UpdateUser Resolver

func (r *mutationResolver) UpdateUser(ctx context.Context, input UpdateUser) (User, error) {   var fields = bson.M{}   var user User   update := false   if input.First != nil && *input.First != "" {      fields["first"] = *input.First      update = true   }   if input.Last != nil && *input.Last != "" {      fields["last"] = *input.Last      update = true   }   if input.Email != nil && *input.Email != "" {      fields["email"] = *input.Email      update = true   }   if !update {      return User{}, errors.New("no fields present for updating data")   }   err := r.users.UpdateId(bson.ObjectIdHex(input.ID), fields)   if err != nil {      return User{}, err   }   err = r.users.Find(bson.M{"_id": bson.ObjectIdHex(input.ID)}).One(&user)   if err != nil {      return User{}, err   }
   user.ID = bson.ObjectId(user.ID).Hex()
   return user, nil}

UpdateNotification Resolver

func (r *mutationResolver) UpdateNotification(ctx context.Context, input *UpdateNotification) (User, error) {   var user User   var oid = bson.ObjectIdHex(input.UserID)   if err := r.users.Find(bson.M{"_id": oid}).One(&user); err != nil {      return User{}, err   }   for index, val := range user.Notifications {      if bson.ObjectId(val.ID).Hex() == input.ID {         val.Seen = input.Seen         user.Notifications[index] = val         break      }   }   if err := r.users.UpdateId(oid, user); err != nil {      return User{}, err   }   return user, nil}

QueryUser Resolver

func (r *queryResolver) User(ctx context.Context, id string) (User, error) {   var user User   if err := r.users.FindId(bson.ObjectIdHex(id)).One(&user); err != nil {      return User{}, err   }   user.ID = bson.ObjectId(user.ID).Hex()   return user, nil}

Acum că am terminat cu configurarea, să trecem la partea principală.

Date MongoDB în timp real cu ChangeStreams

MongoDB acceptă acum date în timp real similare cu firebase începând de la versiunea 3.6. Configurarea nu este totuși la fel de ușoară. Există câteva condiții prealabile importante pentru ca fluxurile de schimbare să funcționeze corect:

  • Este disponibil numai pentru clusterele partajate și seturile de replici cu driverul WireTiger. MongoDB v3.6 + au WireTiger ca driver implicit, dar trebuie să configurăm singuri o replică.
  • Schimbarea fluxului este disponibilă numai dacă "majority" compatibilitatea cu citirea problemelor este activată (este activată implicit).

Iată cum ar arăta semnătura metodei noastre pentru NotificationAdded Resolver:

func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string) (<-chan User, error) {   panic("not implemented")}

Există o problemă cu această implementare și va trebui să o schimbăm ușor pentru a funcționa corect. Dar, mai întâi, să ne uităm la codul necesar în resolver, care ne va ușura, de asemenea, să înțelegem de ce a fost necesară schimbarea.

Mai întâi vom defini cele două variabile userDoc și change și configurați ascultătorul nostru changeStream astfel:

var userDoc Uservar change bson.Mcs, err := r.users.Watch([]bson.M{}, mgo.ChangeStreamOptions{MaxAwaitTimeMS: time.Hour, FullDocument: mgo.FullDocument("updateLookup")})
if err != nil {   return err}if cs.Err() != nil {   fmt.Println(err)}

Aici, urmărim schimbările în colecția de utilizatori. De asemenea, setăm expirarea pentru ChangeStream ca 1 oră. Acest lucru este necesar pentru a menține fluxul de schimbare în viață și pentru a nu închide automat. De asemenea, vom avea nevoie de documentul complet care a fost modificat și așa că definim acea setare și în ChangeStreamOptions. Funcția ceas returnează un cursor pe care îl putem itera apoi.

În continuare, vom începe un goroutine pentru gestionarea evenimentelor cursorului astfel:

go func() {   start := time.Now()   for {      ok := cs.Next(&change)      if ok {         byts, _ := bson.Marshal(change["fullDocument"].(bson.M))         bson.Unmarshal(byts, &userDoc)         userDoc.ID = bson.ObjectId(userDoc.ID).Hex()         if userDoc.ID == id {            *userChan <- userDoc         }      }      if time.Since(start).Minutes() >= 60 {         break      }      continue   }}()

Aici vom itera cursorul folosind cursor.Next() metoda și o buclă for. Ori de câte ori există un eveniment de schimbare, codul din bucla for va fi executat și datele de la acel eveniment ne vor fi disponibile în change variabil.

În esență, vom extrage câmpul complet al documentului din structura de schimbare ca type User în bucla for. Verificăm apoi dacă utilizatorul schimbat este același cu cel pe care îl caută abonamentul. Dacă da, îl trimitem la canalul nostru și așteptăm mai multe evenimente.

Acesta este, de asemenea, un moment bun pentru a discuta semnătura metodei pentru această metodă. Încă o dată, ați avea așa ceva:

func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string) (&lt;-chan User, error) {   ...}

Acesta primește un id care este ID-ul utilizatorului și se așteaptă ca un canal să fie returnat. Dacă returnăm un canal din această funcție, acesta va fi întotdeauna gol. Să ne uităm la generated.go pentru a înțelege mai bine acest lucru. Codul legat de această metodă specială ar arăta cam așa (este separat în fișier, dar acum agregez doar codul necesar):

type SubscriptionResolver interface {   NotificationAdded(ctx context.Context, id string) (&lt;-chan User, error)}
func (ec *executionContext) _Subscription_notificationAdded(ctx context.Context, field graphql.CollectedField) func() graphql.Marshaler {   rawArgs := field.ArgumentMap(ec.Variables)   args, err := field_Subscription_notificationAdded_args(rawArgs)   if err != nil {      ec.Error(ctx, err)      return nil   }   ctx = graphql.WithResolverContext(ctx, &graphql.ResolverContext{      Field: field,   })      rctx := ctx   results, err := ec.resolvers.Subscription().NotificationAdded(rctx, args["id"].(string))   if err != nil {      ec.Error(ctx, err)      return nil   }   return func() graphql.Marshaler {      res, ok := <-results      if !ok {         return nil      }      var out graphql.OrderedMap      out.Add(field.Alias, func() graphql.Marshaler {         return ec._User(ctx, field.Selections, &res)      }())      return &out   }}

Canalul returnat este apoi citit de codul generat pentru a primi actualizările și a-l transmite clientului nostru. Problema este că, odată ce returnăm canalul din rezolvatorul nostru, execuția funcției este deja terminată. Practic, ceea ce înseamnă că canalul nu va primi niciodată valori aici.

Pe de altă parte, dacă valorile au fost adăugate canalului înainte de a-l returna din funcție, în esență va trebui să așteptăm o oră pentru ca toate actualizările să fie trimise către client, deoarece așteptăm o oră pentru ca fluxurile de schimbare să fie timeout (cu condiția să folosim o implementare non-goroutină pentru cursorul nostru ChangeStream). Este clar că aceasta nu este o situație ideală. Să facem câteva modificări la codul de mai sus pentru a-l face să funcționeze pentru noi.

Mai întâi voi defini un canal în _Subscription_notificationAdded metoda al cărei indicator va fi apoi trecut la rezolvatorul nostru. Ar arăta cam așa:

func (ec *executionContext) _Subscription_notificationAdded(ctx context.Context, field graphql.CollectedField) func() graphql.Marshaler {   rawArgs := field.ArgumentMap(ec.Variables)   args, err := field_Subscription_notificationAdded_args(rawArgs)   if err != nil {      ec.Error(ctx, err)      return nil   }   ctx = graphql.WithResolverContext(ctx, &graphql.ResolverContext{      Field: field,   })
   userChan := make(chan User, 1)   rctx := ctx   go ec.resolvers.Subscription().NotificationAdded(rctx, args["id"].(string), &userChan)
   return func() graphql.Marshaler {      res, ok := <-userChan      if !ok {         return nil      }      var out graphql.OrderedMap      out.Add(field.Alias, func() graphql.Marshaler {         return ec._User(ctx, field.Selections, &res)      }())      return &out   }}

Din motive de performanță, creăm un nou canal cu o limită de 1 articol la un moment dat. Apoi îi trecem indicatorul către resolverul nostru și, de asemenea, facem apelul către acest resolver o goroutină.

_Subscription_notificationAdded metoda va returna apoi o funcție care ascultă fișierul userChan și transmite actualizarea către clientul nostru de fiecare dată când se primește o valoare.

De asemenea, trebuie să schimbăm semnătura metodei pentru metoda pe care tocmai am modificat-o, trebuie să o schimbăm

type SubscriptionResolver interface {   NotificationAdded(ctx context.Context, id string) (&lt;-chan User, error)}

la

type SubscriptionResolver interface {   NotificationAdded(ctx context.Context, id string, userChan *chan User) error}

Aceasta este toată modificarea de care avem nevoie. Odată ce ați terminat, iată ce este complet Notificare Adăugat rezolvarea abonamentului ar arata ca:

func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string, userChan *chan User) error {   var userDoc User   var change bson.M   cs, err := r.users.Watch([]bson.M{}, mgo.ChangeStreamOptions{MaxAwaitTimeMS: time.Hour, FullDocument: mgo.FullDocument("updateLookup")})   if err != nil {      return err   }   if cs.Err() != nil {      fmt.Println(err)   }   go func() {      start := time.Now()      for {         ok := cs.Next(&change)         if ok {            byts, _ := bson.Marshal(change["fullDocument"].(bson.M))            bson.Unmarshal(byts, &userDoc)            userDoc.ID = bson.ObjectId(userDoc.ID).Hex()            if userDoc.ID == id {               *userChan <- userDoc            }         }         if time.Since(start).Minutes() >= 60 {            break         }         continue      }   }()   return nil}

Acum, codul care trimite un element către canal și cel care îl primește sunt atât non-blocante, cât și rulează în fundal.

Phew! A fost o mulțime de muncă, dar asta a fost tot ceea ce a trebuit să facem. Să trecem la partea distractivă și să creăm un server și să vedem rezultatul eforturilor noastre.

Lucrurile distractive

Creați un fișier main.go la rădăcina proiectului dvs. cu următorul cod:

package main
import (   "fmt"   "github.com/gorilla/mux"   "github.com/gorilla/websocket"   "github.com/rs/cors"      "log"   "net/http"   "os"   "github.com/99designs/gqlgen/handler"   "<project path relative to GOPATH>/users"   "<project path relative to GOPATH>/db")
const defaultPort = "8080"func main() {   port := os.Getenv("PORT")if port == "" {   port = defaultPort}
db.ConnectDB()
c := cors.New(cors.Options{   AllowedOrigins:   []string{"http://localhost:" + port},   AllowCredentials: true,})r := mux.NewRouter()r.Handle("/", handler.Playground("User", "/users"))r.Handle("/users", c.Handler(handler.GraphQL(users.NewExecutableSchema(users.New()),   handler.WebsocketUpgrader(websocket.Upgrader{      CheckOrigin: func(r *http.Request) bool {         return true      },   }))),)http.Handle("/", r)log.Fatal(http.ListenAndServe(":8080", nil))}

GQLgen ne oferă câteva handler-uri încorporate, cum ar fi Playground și WebsocketUpgrader, care creează în esență o interfață de utilizare pentru testarea serverului nostru GraphQL și pentru a avea o conexiune WebSocket cu clienții.

De asemenea, nu uitați că am adăugat o funcție numită New la rezolvatorii noștri mai devreme, despre care am menționat că vom vorbi mai târziu? Ei bine, aici puteți vedea de ce a fost necesar. În esență, a returnat o structură de configurare care a fost solicitată de gestionarele furnizate de GQLgen pentru ca codul nostru să funcționeze corect. Puteți vedea că utilizează codul implicit users.Config{Resolvers: &users.Resolvers{}} direct, care este, de asemenea, bine, atâta timp cât includeți codul pentru users câmp în structura de rezolvare și setați-l la colecția utilizatorilor.

În acest moment, suntem gata să pornim serverul nostru GraphQL și să testăm lucrurile.

Alerga go build și apoi executați fișierul binar generat. Serverul ar trebui să ruleze până acum. Asigurați-vă că aveți setul de replici MongoDB rulând înainte de a încerca să rulați serverul nostru, altfel va genera o eroare. Tu poți să începi aici dacă aveți nevoie de ajutor pentru a rula un set de replici.

Creaza utilizator

Cum sa gestionati abonamentele GraphQL cu Go GQLgen si MongoDB

Actualizați utilizatorul

1611303010 387 Cum sa gestionati abonamentele GraphQL cu Go GQLgen si MongoDB

Interogare utilizator

1611303011 319 Cum sa gestionati abonamentele GraphQL cu Go GQLgen si MongoDB

Notificare Abonament adăugat

Și iată-l!

Încă o dată, vreau să subliniez că s-ar putea ca aceasta să nu fie soluția optimă a problemei la îndemână, dar este ideea mea despre o posibilă soluție și mi-ar plăcea să primesc feedback-ul și sugestiile dvs. în acest sens.

Mulțumesc pentru lectură. Câțiva ? sunt mereu apreciate?