Newer
Older
"github.com/dgraph-io/dgo"
"github.com/dgraph-io/dgo/protos/api"
Name string `json:"hostName"`
Server string `json:"server"`
Port int `json:"port"`
IsDefault bool `json:"isDefault"`
IsDisabled bool `json:"IsDisabled"`
// UserName string
// Password string
}
type Instance struct {
client *dgo.Dgraph
host Host
}
type DGraphDAO struct {
HostName string
}
var (
instances map[string]*Instance
defaultHost string
configured bool
)
// NewClient returns a new dgraph client for provided configuration.
func NewClient(h Host) (*dgo.Dgraph, error) {
if strings.TrimSpace(h.Server) == "" {
return nil, errors.New("host address can not be empty")
}
address := bindDgraphServerWithPort(h.Server, h.Port)
// Dial a gRPC connection. The address to dial to can be configured when
// setting up the dgraph cluster.
dialOpts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBlock(), // block till we connect to the server
// grpc.WithTimeout(time.Second * 5),
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
}
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) // wait for 5 seconds to connect to grpc server. Exit if the deadline exceeds.
defer cancel()
d, err := grpc.DialContext(ctx, address, dialOpts...)
if err == context.DeadlineExceeded {
return nil, errors.New("graphdb connect error, connection timed out for host " + address)
if err != nil {
return nil, err
}
client := dgo.NewDgraphClient(api.NewDgraphClient(d))
// Note: Supported in Enterprise version only
// if h.UserName != "" {
// if err = client.Login(context.TODO(), h.UserName, h.Password); err != nil {
// return nil, err
// }
// }
return client, nil
}
// NewInstance creates n new v2 instance of dgraph client. This instance can be saved in cache with host name as identifier for further operations.
func NewInstance(client *dgo.Dgraph, host Host) *Instance {
return &Instance{
client: client,
host: host,
}
}
func InitInstances(configs []Host) error {
if configured {
return nil
}
instances = make(map[string]*Instance, len(configs))
for _, host := range configs {
if host.IsDisabled {
continue
}
instances[host.Name] = &Instance{
client: client,
host: host,
if host.IsDefault {
defaultHost = host.Name
}
// GetInstance returns a preconfigured dgraph instance from cache. If not present, returns an error.
func GetInstance(hostName string) (*Instance, error) {
if hostName == "" {
hostName = defaultHost
}
i, ok := instances[hostName]
if !ok {
return nil, errors.New("instance not found")
}
return i, nil
}
// GetDAO returns a dao instance to access and manipulate graph data and schema.
func GetDAO(hostName string) *DGraphDAO {
return &DGraphDAO{HostName: hostName}
}
// CreateSchema sets the provided schema for the nodes data.
func (dg *DGraphDAO) CreateSchema(ctx context.Context, schema string) error {
instance, err := GetInstance(dg.HostName)
return instance.client.Alter(ctx, &api.Operation{Schema: schema})
// SetData sets the provided data as a node. Can be used to create or update a node.
//
// For update, the data must contain `uid` field.
//
// Set `commitNow` to true to commit or discard the changes immediately.
func (dg *DGraphDAO) SetData(ctx context.Context, data []byte, commitNow bool) error {
return dg.mutate(ctx, data, &api.Mutation{
SetJson: data,
CommitNow: commitNow,
// DeleteData deletes the node or provided node attribute.
//
// Set `commitNow` to true to commit or discard the changes immediately.
func (dg *DGraphDAO) DeleteData(ctx context.Context, data []byte, commitNow bool) error {
return dg.mutate(ctx, data, &api.Mutation{
DeleteJson: data,
CommitNow: commitNow,
})
}
func (dg *DGraphDAO) mutate(ctx context.Context, data []byte, mtn *api.Mutation) error {
instance, err := GetInstance(dg.HostName)
txn := instance.client.NewTxn()
_, err = txn.Mutate(ctx, mtn)
return err
// GetData returns the nodes matching to the provided query.
//
// query variables can be provided in `vars` param. Safe to provide `nil` if no variables required.
//
// The result is against the provided key in the query.
func (dg *DGraphDAO) GetData(ctx context.Context, query string, vars map[string]string) ([]byte, error) {
instance, err := GetInstance(dg.HostName)
if err != nil {
return nil, err
}
txn := instance.client.NewReadOnlyTxn().BestEffort()
var res *api.Response
if len(vars) == 0 {
res, err = txn.Query(ctx, query)
} else {
res, err = txn.QueryWithVars(ctx, query, vars)
}
// DropSchema deletes the current schema along with the data.
func (dg *DGraphDAO) DropSchema(ctx context.Context) error {
instance, err := GetInstance(dg.HostName)
if err != nil {
return err
}
return instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ALL})
}
func (dg *DGraphDAO) DropData(ctx context.Context) error {
instance, err := GetInstance(dg.HostName)
if err != nil {
return err
return instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA})
}
// DropAttr deletes a specific attribute completely from data and the schema.
func (dg *DGraphDAO) DropAttr(ctx context.Context, attr string) error {
instance, err := GetInstance(dg.HostName)
return instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ATTR, DropValue: attr})
}
// DropEdge deletes the edges for the mentioned node. predicate is the name of relationship between the node.
//
// Ex. Persion1 `follows` Person2.
func (dg *DGraphDAO) DropEdge(ctx context.Context, uid string, predicates ...string) error {
instance, err := GetInstance(dg.HostName)
mu := &api.Mutation{}
dgo.DeleteEdges(mu, uid, predicates...)
mu.CommitNow = true
_, err = instance.client.NewTxn().Mutate(ctx, mu)
return err
func bindDgraphServerWithPort(server string, port int) string {
// if port is empty then use default port 9080(GRPC Port) & bind to server ip
if port <= 0 || strings.TrimSpace(strconv.Itoa(port)) == "" {