Newer
Older
import (
"sync"
"time"
cron "gopkg.in/robfig/cron.v2"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/filemdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"github.com/pquerna/ffjson/ffjson"
// Statistic - for app application
type Statistic struct {
MaxTime time.Duration `json:"maxTime"`
MinTime time.Duration `json:"minTime"`
TotalTime time.Duration `json:"totalTime"`
ErrorTime *time.Time `json:"errorTime"`
LastError string `json:"lastError"`
Description string `json:"description"`
IsRestricted bool `json:"isRestricted"`
IsRoleBased bool `json:"isRoleBased"`
Branch string `json:"branch"` // Only applicable to activity stats
}
type groupResponse struct {
GroupTime string `json:"name"`
Hits int64 `json:"hits"`
QueryState map[string]Statistic `json:"queryState"`
TotalSMS int64 `json:"totalSMS"`
TotalEmail int64 `json:"totalEmail"`
TotalOTP int64 `json:"totalOTP"`
OTPInCache int64 `json:"otpInCache"`
TotalMongoHits int64 `json:"totalMongoHits"`
TotalMySQLHits int64 `json:"totalMySQLHits"`
TotalHits int64 `json:"totalHits"`
CacheHits int64 `json:"cacheHits"`
CacheMiss int64 `json:"cacheMiss"`
StartTime time.Time `json:"startTime"`
NextTime time.Time `json:"nextTime"`
GroupReport []groupResponse `json:"groupReport"`
}
type entityHits struct {
TotalSMS int64 `json:"totalSMS"`
TotalEmail int64 `json:"totalEmail"`
TotalOTP int64 `json:"totalOTP"`
OTPInCache int64 `json:"otpInCache"`
Mutex *sync.Mutex
}
type dbHits struct {
MongoHits int64 `json:"mongoHits"`
MySQLHits int64 `json:"mysqlHits"`
Mutex *sync.Mutex
type cacheStates struct {
totalHits int64
cacheHits int64
cacheMiss int64
cacheHitsMutex *sync.Mutex
}
// entityHits - manages hits for SMS and EMAILS
var entityHitsObj entityHits
// dbHitsObj - manages hits for Mongo and MySQL
var dbHitsObj dbHits
var stateMutex = &sync.Mutex{}
var cacheStatistic *cacheStates
// serverStartTime - server start time
var serverStartTime time.Time
var initStatus = false
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// cacheStatistic = &cacheStates{
// cacheHitsMutex: &sync.Mutex{},
// }
// entityHitsObj.Mutex = &sync.Mutex{}
// dbHitsObj.Mutex = &sync.Mutex{}
// serverStartTime = time.Now()
// ba := readStatisticsFile()
// unmarshalErr := ffjson.Unmarshal(ba, &clientResponseData)
// if errormdl.CheckErr(unmarshalErr) != nil {
// loggermdl.LogError(unmarshalErr)
// return
// }
// stateCache = clientResponseData.ServicesState
// if stateCache == nil {
// stateCache = make(map[string]Statistic)
// }
// queryCache = clientResponseData.QueryState
// if queryCache == nil {
// queryCache = make(map[string]Statistic)
// }
// clientResponseData.NextTime = time.Now().Add(600 * time.Second)
// c := cron.New()
// c.AddFunc("@every 30s", collectStatistics)
// c.Start()
}
// Init - Init
func Init(name string) {
projectName = name
cacheStatistic = &cacheStates{
cacheHitsMutex: &sync.Mutex{},
}
entityHitsObj.Mutex = &sync.Mutex{}
dbHitsObj.Mutex = &sync.Mutex{}
serverStartTime = time.Now()
ba := readStatisticsFile()
unmarshalErr := ffjson.Unmarshal(ba, &clientResponseData)
if errormdl.CheckErr(unmarshalErr) != nil {
loggermdl.LogError(unmarshalErr)
return
}
stateCache = clientResponseData.ServicesState
if stateCache == nil {
stateCache = make(map[string]Statistic)
}
queryCache = clientResponseData.QueryState
if queryCache == nil {
queryCache = make(map[string]Statistic)
}
clientResponseData.NextTime = time.Now().Add(600 * time.Second)
c := cron.New()
c.AddFunc("@every 30s", collectStatistics)
c.Start()
}
func readStatisticsFile() []byte {
filePath := getFilePath()
if !filemdl.FileAvailabilityCheck(filePath) {
return []byte("{}")
}
ba, readErr := filemdl.ReadFile(filePath)
if errormdl.CheckErr(readErr) != nil {
loggermdl.LogError(readErr)
return []byte("{}")
}
return ba
}
func getFilePath() string {
if projectName == "" {
return "./statistics.json"
}
return "./statistics/" + projectName + ".json"
if !initStatus {
return
}
cacheStatistic.totalHits++
cacheStatistic.cacheHitsMutex.Unlock()
}
func updateGroupCache(hitCount int64) {
if !initStatus {
return
}
}
// UpdateServiceState - update entry of service in state map
func UpdateServiceState(serviceName string, servingTime time.Duration, serviceError error, isRestricted, isRoleBased bool) {
UpdateServiceStateWithBranch(serviceName, "main", servingTime, serviceError, isRestricted, isRoleBased)
}
func concatenateNameWithBranch(name, branch string) string {
if branch == "" {
return name + "_main"
}
return name + "_" + branch
}
// UpdateServiceStateWithBranch - update entry of service from a branch in state map
func UpdateServiceStateWithBranch(serviceName, branch string, servingTime time.Duration, serviceError error, isRestricted, isRoleBased bool) {
if !initStatus {
return
}
key := concatenateNameWithBranch(serviceName, branch)
serviceState, ok := stateCache[key]
IsRestricted: isRestricted,
IsRoleBased: isRoleBased,
}
}
serviceState.TotalHits++
if serviceError != nil {
serviceState.ErrorCount++
serviceState.LastError = serviceError.Error()
ct := time.Now()
serviceState.ErrorTime = &ct
} else {
serviceState.TotalTime += servingTime
if servingTime > serviceState.MaxTime {
serviceState.MaxTime = servingTime
}
if servingTime < serviceState.MinTime || serviceState.MinTime == 0 {
stateCache[key] = serviceState
// UpdateQueryState - update entry of service in state map
func UpdateQueryState(queryName string, name string, servingTime time.Duration, serviceError error, isRestricted, isRoleBased bool) {
if !initStatus {
return
}
stateMutex.Lock()
queryState, ok := queryCache[queryName]
if !ok {
queryState = Statistic{
ServiceName: queryName,
IsRestricted: isRestricted,
IsRoleBased: isRoleBased,
}
}
queryState.TotalHits++
if serviceError != nil {
queryState.ErrorCount++
queryState.LastError = serviceError.Error()
ct := time.Now()
queryState.ErrorTime = &ct
} else {
queryState.TotalTime += servingTime
if servingTime > queryState.MaxTime {
queryState.MaxTime = servingTime
}
if servingTime < queryState.MinTime || queryState.MinTime == 0 {
queryState.MinTime = servingTime
}
}
queryCache[queryName] = queryState
stateMutex.Unlock()
}
// UpdateGlobalServiceCacheState - update only cache hits and miss count for all services
func UpdateGlobalServiceCacheState(cacheHit bool) {
if !initStatus {
return
}
cacheStatistic.cacheHitsMutex.Lock()
defer cacheStatistic.cacheHitsMutex.Unlock()
if cacheHit {
cacheStatistic.cacheHits++
} else {
cacheStatistic.cacheMiss++
}
}
// EmailHits - update only email hits count for all services
func EmailHits() {
if !initStatus {
return
}
entityHitsObj.Mutex.Lock()
defer entityHitsObj.Mutex.Unlock()
entityHitsObj.TotalEmail++
}
// OTPHits - update only OTPs hits count for all services -- DPK [12-June-2018]
func OTPHits() {
if !initStatus {
return
}
entityHitsObj.Mutex.Lock()
entityHitsObj.TotalOTP++
entityHitsObj.Mutex.Unlock()
}
// OTPInCache - update only OTPs hits count for all services -- DPK [12-June-2018]
func OTPInCache(count int64) {
if !initStatus {
return
}
entityHitsObj.Mutex.Lock()
entityHitsObj.OTPInCache = count
entityHitsObj.Mutex.Unlock()
}
// SMSHits - update only sms hits count for all services
func SMSHits() {
if !initStatus {
return
}
entityHitsObj.Mutex.Lock()
defer entityHitsObj.Mutex.Unlock()
entityHitsObj.TotalSMS++
}
// MongoHits - update only mongo hits count for all services
func MongoHits() {
if !initStatus {
return
}
dbHitsObj.Mutex.Lock()
defer dbHitsObj.Mutex.Unlock()
dbHitsObj.MongoHits++
}
// MySQLHits - update only MySQL hits count for all services
func MySQLHits() {
if !initStatus {
return
}
dbHitsObj.Mutex.Lock()
defer dbHitsObj.Mutex.Unlock()
dbHitsObj.MySQLHits++
}
func collectStatistics() {
if !initStatus {
return
}
clientReponseMutex.Lock()
defer clientReponseMutex.Unlock()
clientResponseData.StartTime = serverStartTime
cacheStatistic.cacheHitsMutex.Lock()
clientResponseData.TotalHits += cacheStatistic.totalHits
clientResponseData.CacheHits += cacheStatistic.cacheHits
clientResponseData.CacheMiss += cacheStatistic.cacheMiss
cacheStatistic.totalHits = 0
cacheStatistic.cacheMiss = 0
cacheStatistic.cacheHits = 0
cacheStatistic.cacheHitsMutex.Unlock()
entityHitsObj.Mutex.Lock()
clientResponseData.TotalEmail += entityHitsObj.TotalEmail
entityHitsObj.TotalEmail = 0
clientResponseData.OTPInCache = entityHitsObj.OTPInCache
entityHitsObj.OTPInCache = 0
clientResponseData.TotalOTP += entityHitsObj.TotalOTP
entityHitsObj.TotalOTP = 0
clientResponseData.TotalSMS += entityHitsObj.TotalSMS
entityHitsObj.TotalSMS = 0
entityHitsObj.Mutex.Unlock()
// DB hits collect
dbHitsObj.Mutex.Lock()
clientResponseData.TotalMongoHits += dbHitsObj.MongoHits
dbHitsObj.MongoHits = 0
clientResponseData.TotalMySQLHits += dbHitsObj.MySQLHits
dbHitsObj.MySQLHits = 0
dbHitsObj.Mutex.Unlock()
groupMutex.Lock()
current := time.Now()
if current.After(clientResponseData.NextTime) || len(clientResponseData.GroupReport) == 0 {
gr := groupResponse{}
gr.GroupTime = current.String()
gr.Hits = groupCount
clientResponseData.GroupReport = append(clientResponseData.GroupReport, gr)
clientResponseData.NextTime = time.Now().Add(30 * time.Second)
} else {
clientResponseData.GroupReport[len(clientResponseData.GroupReport)-1].Hits += groupCount
}
length := len(clientResponseData.GroupReport)
if length > lastKept {
clientResponseData.GroupReport = clientResponseData.GroupReport[length-lastKept:]
}
groupCount = 0
groupMutex.Unlock()
clientResponseData.ServicesState = stateCache
// the marshall function reads data from clientResponseData.ServicesState and clientResponseData.QueryState
// as both are maps, we have passed a referrence of stateCache and queryCache respectively.
// The following lock guards these two underlying data structures.
stateMutex.Lock()
defer stateMutex.Unlock()
ba, marshalError := ffjson.Marshal(clientResponseData)
if errormdl.CheckErr(marshalError) != nil {
loggermdl.LogError(marshalError)
return
}
writeErr := filemdl.WriteFile(getFilePath(), ba, true, false)
if errormdl.CheckErr(writeErr) != nil {
loggermdl.LogError(writeErr)
}
}