Commit e603fa30 authored by Ajit Jagtap's avatar Ajit Jagtap
Browse files

Merge branch 'ab_Add_GraohDBTransactions' into 'devbranch'

Add: Transactions support for graph db operations.

See merge request !183
parents fb61eacd 118bae9c
Branches
Tags
2 merge requests!188V2 : mep deployemet28 april2020,!183Add: Transactions support for graph db operations.
Showing with 190 additions and 99 deletions
......@@ -30,6 +30,8 @@ type Instance struct {
type DGraphDAO struct {
HostName string
instance *Instance
}
var (
......@@ -38,6 +40,11 @@ var (
configured bool
)
var (
ErrNotConfigured = errors.New("graph db instances not configured. InitInstances() must be called to configure.")
ErrInstanceNotFound = errors.New("graph db instance not found")
)
// NewClient returns a new dgraph client for provided configuration.
func NewClient(h Host) (*dgo.Dgraph, error) {
......@@ -124,6 +131,10 @@ func InitInstances(configs []Host) error {
// GetInstance returns a preconfigured dgraph instance from cache. If not present, returns an error.
func GetInstance(hostName string) (*Instance, error) {
if !configured {
return nil, ErrNotConfigured
}
if hostName == "" {
hostName = defaultHost
}
......@@ -131,61 +142,41 @@ func GetInstance(hostName string) (*Instance, error) {
i, ok := instances[hostName]
if !ok {
return nil, errors.New("instance not found")
return nil, ErrInstanceNotFound
}
return i, nil
}
// GetDAO returns a dao instance to access and manipulate graph data and schema.
func GetDAO(hostName string) *DGraphDAO {
return &DGraphDAO{HostName: hostName}
}
// CreateSchema sets the provided schema for the nodes data.
func (dg *DGraphDAO) CreateSchema(ctx context.Context, schema string) error {
instance, err := GetInstance(dg.HostName)
//
// If hostname is empty, default host will be used.
//
// Otherwise ErrInstanceNotFound error will be returned.
func GetDAO(hostName string) (*DGraphDAO, error) {
ist, err := GetInstance(hostName)
if err != nil {
return err
return nil, err
}
return instance.client.Alter(ctx, &api.Operation{Schema: schema})
return &DGraphDAO{
HostName: hostName,
instance: ist,
}, nil
}
// SetData sets the provided data as a node. Can be used to create or update a node.
//
// For update, the data must contain `uid` field.
// GetTransaction returns a new transaction for provided host.
//
// Set `commitNow` to true to commit or discard the changes immediately.
func (dg *DGraphDAO) SetData(ctx context.Context, data []byte, commitNow bool) error {
return dg.mutate(ctx, data, &api.Mutation{
SetJson: data,
CommitNow: commitNow,
})
}
// DeleteData deletes the node or provided node attribute.
// If hostname is empty, transaction from default host will be returned.
//
// Set `commitNow` to true to commit or discard the changes immediately.
func (dg *DGraphDAO) DeleteData(ctx context.Context, data []byte, commitNow bool) error {
return dg.mutate(ctx, data, &api.Mutation{
DeleteJson: data,
CommitNow: commitNow,
})
}
// Otherwise ErrInstanceNotFound error will be returned.
func GetTransaction(hostName string) (*dgo.Txn, error) {
// mutate creates or updates the node data.
func (dg *DGraphDAO) mutate(ctx context.Context, data []byte, mtn *api.Mutation) error {
instance, err := GetInstance(dg.HostName)
ist, err := GetInstance(hostName)
if err != nil {
return err
return nil, err
}
txn := instance.client.NewTxn()
_, err = txn.Mutate(ctx, mtn)
return err
return ist.client.NewTxn(), nil
}
// GetData returns the nodes matching to the provided query.
......@@ -194,13 +185,12 @@ func (dg *DGraphDAO) mutate(ctx context.Context, data []byte, mtn *api.Mutation)
//
// The result is against the provided key in the query.
func (dg *DGraphDAO) GetData(ctx context.Context, query string, vars map[string]string) ([]byte, error) {
instance, err := GetInstance(dg.HostName)
if err != nil {
return nil, err
}
txn := instance.client.NewReadOnlyTxn().BestEffort()
var res *api.Response
txn := dg.instance.client.NewReadOnlyTxn().BestEffort()
var (
res *api.Response
err error
)
if len(vars) == 0 {
res, err = txn.Query(ctx, query)
} else {
......@@ -214,53 +204,120 @@ func (dg *DGraphDAO) GetData(ctx context.Context, query string, vars map[string]
return res.GetJson(), nil
}
// DropSchema deletes the current schema along with the data.
func (dg *DGraphDAO) DropSchema(ctx context.Context) error {
instance, err := GetInstance(dg.HostName)
if err != nil {
return err
}
// SetData sets the provided data as a node. Can be used to create or update a node.
//
// For update, the data must contain `uid` field.
func (dg *DGraphDAO) SetData(ctx context.Context, data []byte) error {
return dg.mutate(ctx, &api.Mutation{
SetJson: data,
CommitNow: true,
})
}
return instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ALL})
// SetData sets the provided data as a node. Can be used to create or update a node.
//
// For update, the data must contain `uid` field.
//
// The transacction must be committed or discarded.
func (dg *DGraphDAO) SetDataTXN(ctx context.Context, txn *dgo.Txn, data []byte) error {
return dg.mutateTXN(ctx,
&api.Mutation{
SetJson: data,
},
txn,
)
}
// DropData deletes complete data but maintains the schema.
func (dg *DGraphDAO) DropData(ctx context.Context) error {
instance, err := GetInstance(dg.HostName)
if err != nil {
return err
}
// DeleteData deletes the node or provided node attribute.
func (dg *DGraphDAO) DeleteData(ctx context.Context, data []byte) error {
return dg.mutate(ctx, &api.Mutation{
DeleteJson: data,
CommitNow: true,
})
}
return instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA})
// DeleteData deletes the node or provided node attribute.
//
// The transacction must be committed or discarded.
func (dg *DGraphDAO) DeleteDataTXN(ctx context.Context, txn *dgo.Txn, data []byte) error {
return dg.mutateTXN(ctx,
&api.Mutation{
DeleteJson: data,
},
txn,
)
}
// DropAttr deletes a specific attribute completely from data and the schema.
func (dg *DGraphDAO) DropAttr(ctx context.Context, attr string) error {
instance, err := GetInstance(dg.HostName)
if err != nil {
return err
// DeleteEdge deletes the edges for the mentioned node. predicate is the name of relationship between the node.
//
// Ex. Persion1 `follows` Person2.
func (dg *DGraphDAO) DeleteEdge(ctx context.Context, uid string, predicates ...string) error {
mu := &api.Mutation{
CommitNow: true,
}
return instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ATTR, DropValue: attr})
dgo.DeleteEdges(mu, uid, predicates...)
return dg.mutate(ctx, mu)
}
// DropEdge deletes the edges for the mentioned node. predicate is the name of relationship between the node.
// DeleteEdgeTXN deletes the edges for the mentioned node. predicate is the name of relationship between the node.
//
// Ex. Persion1 `follows` Person2.
func (dg *DGraphDAO) DropEdge(ctx context.Context, uid string, predicates ...string) error {
instance, err := GetInstance(dg.HostName)
if err != nil {
return err
func (dg *DGraphDAO) DeleteEdgeTXN(ctx context.Context, txn *dgo.Txn, uid string, predicates ...string) error {
mu := &api.Mutation{
CommitNow: true,
}
mu := &api.Mutation{}
dgo.DeleteEdges(mu, uid, predicates...)
mu.CommitNow = true
_, err = instance.client.NewTxn().Mutate(ctx, mu)
return dg.mutateTXN(ctx, mu, txn)
}
// mutate creates or updates or deletes the node data.
func (dg *DGraphDAO) mutate(ctx context.Context, mtn *api.Mutation) error {
txn := dg.instance.client.NewTxn()
_, err := txn.Mutate(ctx, mtn)
return err
}
// mutate creates or updates the node data.
func (dg *DGraphDAO) mutateTXN(ctx context.Context, mtn *api.Mutation, txn *dgo.Txn) error {
_, err := txn.Mutate(ctx, mtn)
return err
}
// CreateSchema sets the provided schema for the nodes data.
func (dg *DGraphDAO) CreateSchema(ctx context.Context, schema string) error {
return dg.instance.client.Alter(ctx, &api.Operation{Schema: schema})
}
// DropSchema deletes the current schema along with the data.
func (dg *DGraphDAO) DropSchema(ctx context.Context) error {
return dg.instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ALL})
}
// DropData deletes complete data but maintains the schema.
func (dg *DGraphDAO) DropData(ctx context.Context) error {
return dg.instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA})
}
// DropAttr deletes a specific attribute completely from data and the schema.
func (dg *DGraphDAO) DropAttr(ctx context.Context, attr string) error {
return dg.instance.client.Alter(ctx, &api.Operation{DropOp: api.Operation_ATTR, DropValue: attr})
}
func bindDgraphServerWithPort(server string, port int) string {
// if port is empty then use default port 9080(GRPC Port) & bind to server ip
......
......@@ -23,17 +23,17 @@ func Test_NewClient(t *testing.T) {
}{
{
name: "success on valid connection",
args: args{h: Host{Name: "graphDBHost", Server: "10.1.20.14", Port: 9080}},
args: args{h: Host{Name: "graphDBHost", Server: dgraphHost.Server, Port: 9080}},
wantErr: false,
},
{
name: "fail on connection fail",
args: args{h: Host{Name: "graphDBHost", Server: "10.1.20.14", Port: 8080}},
args: args{h: Host{Name: "graphDBHost", Server: dgraphHost.Server, Port: 8080}},
wantErr: true,
},
{
name: "success on default port used",
args: args{h: Host{Name: "graphDBHost", Server: "10.1.20.14"}},
args: args{h: Host{Name: "graphDBHost", Server: dgraphHost.Server}},
wantErr: false,
},
{
......@@ -99,7 +99,12 @@ func Test_CreateSchema(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
dg, err := GetDAO(dgraphHost.Name)
if (err != nil) != tt.wantErr {
t.Errorf("CreateSchema() error = %v, wantErr %v", err, tt.wantErr)
return
}
err = dg.CreateSchema(tt.args.c, tt.args.schema)
if (err != nil) != tt.wantErr {
t.Errorf("CreateSchema() error = %v, wantErr %v", err, tt.wantErr)
......@@ -111,9 +116,8 @@ func Test_CreateSchema(t *testing.T) {
func Test_SetData(t *testing.T) {
type args struct {
c context.Context
data []byte
commit bool
c context.Context
data []byte
}
tests := []struct {
......@@ -134,12 +138,12 @@ func Test_SetData(t *testing.T) {
"age": 37
}
}
}`), commit: true},
}`)},
wantErr: false,
},
{
name: "failure on incorrect data",
args: args{c: context.Background(), data: []byte(``), commit: true},
args: args{c: context.Background(), data: []byte(``)},
wantErr: true,
},
}
......@@ -150,8 +154,12 @@ func Test_SetData(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
err = dg.SetData(tt.args.c, tt.args.data, tt.args.commit)
dg, err := GetDAO(dgraphHost.Name)
if err != nil && !tt.wantErr {
t.Errorf("SetData() error = %v", err)
}
err = dg.SetData(tt.args.c, tt.args.data)
if err != nil && !tt.wantErr {
t.Errorf("SetData() error = %v", err)
}
......@@ -192,7 +200,11 @@ func Test_DropAttr(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
dg, err := GetDAO(dgraphHost.Name)
if err != nil && !tt.wantErr {
t.Errorf("DropAttr() error = %v", err)
}
err = dg.DropAttr(tt.args.c, tt.args.attr)
if err != nil && !tt.wantErr {
t.Errorf("DropAttr() error = %v", err)
......@@ -238,7 +250,11 @@ func Test_GetData(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
dg, err := GetDAO(dgraphHost.Name)
if err != nil && !tt.wantErr {
t.Errorf("GetData() error = %v", err)
}
_, err = dg.GetData(tt.args.c, tt.args.query, tt.args.vars)
if err != nil && !tt.wantErr {
t.Errorf("GetData() error = %v", err)
......@@ -246,7 +262,7 @@ func Test_GetData(t *testing.T) {
}
}
func Test_DropEdge(t *testing.T) {
func Test_DeleteEdge(t *testing.T) {
type args struct {
c context.Context
uid string
......@@ -281,19 +297,22 @@ func Test_DropEdge(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
err = dg.DropEdge(tt.args.c, tt.args.uid, tt.args.predicates...)
dg, err := GetDAO(dgraphHost.Name)
if err != nil && !tt.wantErr {
t.Errorf("DropEdge() error = %v", err)
t.Errorf("DeleteEdge() error = %v", err)
}
err = dg.DeleteEdge(tt.args.c, tt.args.uid, tt.args.predicates...)
if err != nil && !tt.wantErr {
t.Errorf("DeleteEdge() error = %v", err)
}
}
}
func Test_DeleteData(t *testing.T) {
type args struct {
c context.Context
data []byte
commit bool
c context.Context
data []byte
}
tests := []struct {
......@@ -305,12 +324,12 @@ func Test_DeleteData(t *testing.T) {
name: "success on delete correct data",
args: args{c: context.Background(), data: []byte(`{
"uid": "0x754f"
}`), commit: true},
}`)},
wantErr: false,
},
{
name: "failure on incorrect delete data",
args: args{c: context.Background(), data: []byte(``), commit: true},
args: args{c: context.Background(), data: []byte(``)},
wantErr: true,
},
}
......@@ -321,8 +340,12 @@ func Test_DeleteData(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
err = dg.DeleteData(tt.args.c, tt.args.data, tt.args.commit)
dg, err := GetDAO(dgraphHost.Name)
if err != nil && !tt.wantErr {
t.Errorf("DeleteData() error = %v", err)
}
err = dg.DeleteData(tt.args.c, tt.args.data)
if err != nil && !tt.wantErr {
t.Errorf("DeleteData() error = %v", err)
}
......@@ -351,7 +374,11 @@ func Test_DropData(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
dg, err := GetDAO(dgraphHost.Name)
if err != nil {
t.Errorf("DropData() error = %v, wantErr %v", err, tt.wantErr)
}
err = dg.DropData(tt.args.c)
if err != nil {
t.Errorf("DropData() error = %v, wantErr %v", err, tt.wantErr)
......@@ -382,7 +409,11 @@ func Test_DropSchema(t *testing.T) {
t.Errorf("InitInstances() error = %v, wantErr %v", err, tt.wantErr)
}
dg := GetDAO(dgraphHost.Name)
dg, err := GetDAO(dgraphHost.Name)
if err != nil && !tt.wantErr {
t.Errorf("DropSchema() error = %v", err)
}
err = dg.DropSchema(tt.args.c)
if err != nil && !tt.wantErr {
t.Errorf("DropSchema() error = %v", err)
......
......@@ -10,6 +10,8 @@ import (
"sync"
"time"
"github.com/dgraph-io/dgo"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/utiliymdl/guidmdl"
"github.com/tidwall/sjson"
......@@ -71,6 +73,7 @@ type AbstractBusinessLogicHolder struct {
IgnoreStrictMode bool
TXN *sql.Tx // transaction for MySQL
SQLServerTXN *sql.Tx // Transaction for SQLServer
GraphDbTXN *dgo.Txn
}
// SetGlobalConfig - SetGlobalConfig
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment