package dgraph import ( "context" "errors" "strconv" "strings" "time" "github.com/dgraph-io/dgo" "github.com/dgraph-io/dgo/protos/api" "google.golang.org/grpc" ) type Host struct { Name string `json:"hostName"` Server string `json:"server"` Port int `json:"port"` IsDefault bool `json:"isDefault"` IsDisabled bool `json:"IsDisabled"` // UserName string // Password string } type Instance struct { client *dgo.Dgraph host Host } type DGraphDAO struct { HostName string instance *Instance } var ( instances map[string]*Instance defaultHost string configured bool ) var ( ErrNotConfigured = errors.New("graph db instances not configured. InitInstances() must be called to configure.") ErrInstanceNotFound = errors.New("graph db instance not found") ) // NewClient returns a new dgraph client for provided configuration. func NewClient(h Host) (*dgo.Dgraph, error) { if strings.TrimSpace(h.Server) == "" { return nil, errors.New("host address can not be empty") } address := bindDgraphServerWithPort(h.Server, h.Port) // Dial a gRPC connection. The address to dial to can be configured when // setting up the dgraph cluster. dialOpts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithBlock(), // block till we connect to the server // grpc.WithTimeout(time.Second * 5), grpc.WithDefaultCallOptions( // grpc.UseCompressor(gzip.Name), grpc.WaitForReady(true), ), } ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) // wait for 5 seconds to connect to grpc server. Exit if the deadline exceeds. defer cancel() d, err := grpc.DialContext(ctx, address, dialOpts...) if err == context.DeadlineExceeded { return nil, errors.New("graphdb connect error, connection timed out for host " + address) } if err != nil { return nil, err } client := dgo.NewDgraphClient(api.NewDgraphClient(d)) // Note: Supported in Enterprise version only // if h.UserName != "" { // if err = client.Login(context.TODO(), h.UserName, h.Password); err != nil { // return nil, err // } // } return client, nil } // NewInstance creates n new v2 instance of dgraph client. This instance can be saved in cache with host name as identifier for further operations. func NewInstance(client *dgo.Dgraph, host Host) *Instance { return &Instance{ client: client, host: host, } } func InitInstances(configs []Host) error { if configured { return nil } instances = make(map[string]*Instance, len(configs)) for _, host := range configs { if host.IsDisabled { continue } client, err := NewClient(host) if err != nil { return err } instances[host.Name] = &Instance{ client: client, host: host, } if host.IsDefault { defaultHost = host.Name } } configured = true return nil } // GetInstance returns a preconfigured dgraph instance from cache. If not present, returns an error. func GetInstance(hostName string) (*Instance, error) { if !configured { return nil, ErrNotConfigured } if hostName == "" { hostName = defaultHost } i, ok := instances[hostName] if !ok { return nil, ErrInstanceNotFound } return i, nil } // GetDAO returns a dao instance to access and manipulate graph data and schema. // // If hostname is empty, default host will be used. // // Otherwise ErrInstanceNotFound error will be returned. func GetDAO(hostName string) (*DGraphDAO, error) { ist, err := GetInstance(hostName) if err != nil { return nil, err } return &DGraphDAO{ HostName: hostName, instance: ist, }, nil } // GetTransaction returns a new transaction for provided host. // // If hostname is empty, transaction from default host will be returned. // // Otherwise ErrInstanceNotFound error will be returned. func GetTransaction(hostName string) (*dgo.Txn, error) { ist, err := GetInstance(hostName) if err != nil { return nil, err } return ist.client.NewTxn(), nil } // GetData returns the nodes matching to the provided query. // // query variables can be provided in `vars` param. Safe to provide `nil` if no variables required. // // The result is against the provided key in the query. func (dg *DGraphDAO) GetData(ctx context.Context, query string, vars map[string]string) ([]byte, error) { txn := dg.instance.client.NewReadOnlyTxn().BestEffort() var ( res *api.Response err error ) if len(vars) == 0 { res, err = txn.Query(ctx, query) } else { res, err = txn.QueryWithVars(ctx, query, vars) } if err != nil { return nil, err } return res.GetJson(), nil } // SetData sets the provided data as a node. Can be used to create or update a node. // // For update, the data must contain `uid` field. func (dg *DGraphDAO) SetData(ctx context.Context, data []byte) error { return dg.mutate(ctx, &api.Mutation{ SetJson: data, CommitNow: true, }) } // SetData sets the provided data as a node. Can be used to create or update a node. // // For update, the data must contain `uid` field. // // The transacction must be committed or discarded. func (dg *DGraphDAO) SetDataTXN(ctx context.Context, txn *dgo.Txn, data []byte) error { return dg.mutateTXN(ctx, &api.Mutation{ SetJson: data, }, txn, ) } // DeleteData deletes the node or provided node attribute. func (dg *DGraphDAO) DeleteData(ctx context.Context, data []byte) error { return dg.mutate(ctx, &api.Mutation{ DeleteJson: data, CommitNow: true, }) } // DeleteData deletes the node or provided node attribute. // // The transacction must be committed or discarded. func (dg *DGraphDAO) DeleteDataTXN(ctx context.Context, txn *dgo.Txn, data []byte) error { return dg.mutateTXN(ctx, &api.Mutation{ DeleteJson: data, }, txn, ) } // DeleteEdge deletes the edges for the mentioned node. predicate is the name of relationship between the node. // // Ex. Persion1 `follows` Person2. func (dg *DGraphDAO) DeleteEdge(ctx context.Context, uid string, predicates ...string) error { mu := &api.Mutation{ CommitNow: true, } dgo.DeleteEdges(mu, uid, predicates...) // // mu.CommitNow = true // _, err := dg.instance.client.NewTxn().Mutate(ctx, mu) return dg.mutate(ctx, mu) } // DeleteEdgeTXN deletes the edges for the mentioned node. predicate is the name of relationship between the node. // // Ex. Persion1 `follows` Person2. func (dg *DGraphDAO) DeleteEdgeTXN(ctx context.Context, txn *dgo.Txn, uid string, predicates ...string) error { mu := &api.Mutation{ CommitNow: true, } dgo.DeleteEdges(mu, uid, predicates...) return dg.mutateTXN(ctx, mu, txn) } // mutate creates or updates or deletes the node data. func (dg *DGraphDAO) mutate(ctx context.Context, mtn *api.Mutation) error { txn := dg.instance.client.NewTxn() _, err := txn.Mutate(ctx, mtn) return err } // mutate creates or updates the node data. func (dg *DGraphDAO) mutateTXN(ctx context.Context, mtn *api.Mutation, txn *dgo.Txn) error { _, err := txn.Mutate(ctx, mtn) return err } // CreateSchema sets the provided schema for the nodes data. func (dg *DGraphDAO) CreateSchema(ctx context.Context, schema string) error { return dg.instance.client.Alter(ctx, &api.Operation{Schema: schema}) } // DropSchema deletes the current schema along with the data. func (dg *DGraphDAO) DropSchema(ctx context.Context) error { return dg.instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ALL}) } // DropData deletes complete data but maintains the schema. func (dg *DGraphDAO) DropData(ctx context.Context) error { return dg.instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA}) } // DropAttr deletes a specific attribute completely from data and the schema. func (dg *DGraphDAO) DropAttr(ctx context.Context, attr string) error { return dg.instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ATTR, DropValue: attr}) } func bindDgraphServerWithPort(server string, port int) string { // if port is empty then use default port 9080(GRPC Port) & bind to server ip if port <= 0 || strings.TrimSpace(strconv.Itoa(port)) == "" { return server + ":9080" } return server + ":" + strconv.Itoa(port) }