package statemdl import ( "sync" "time" cron "gopkg.in/robfig/cron.v2" "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl" "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/filemdl" "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl" "github.com/pquerna/ffjson/ffjson" ) var projectName string // Statistic - for app application type Statistic struct { ServiceName string `json:"serviceName"` Name string `json:"name"` TotalHits int64 `json:"totalHits"` MaxTime time.Duration `json:"maxTime"` MinTime time.Duration `json:"minTime"` TotalTime time.Duration `json:"totalTime"` ErrorCount int64 `json:"errorCount"` ErrorTime *time.Time `json:"errorTime"` LastError string `json:"lastError"` Description string `json:"description"` IsRestricted bool `json:"isRestricted"` IsRoleBased bool `json:"isRoleBased"` Branch string `json:"branch"` // Only applicable to activity stats } type groupResponse struct { GroupTime string `json:"name"` Hits int64 `json:"hits"` } type clientResponse struct { ServicesState map[string]Statistic `json:"servicesState"` QueryState map[string]Statistic `json:"queryState"` TotalSMS int64 `json:"totalSMS"` TotalEmail int64 `json:"totalEmail"` TotalOTP int64 `json:"totalOTP"` OTPInCache int64 `json:"otpInCache"` TotalMongoHits int64 `json:"totalMongoHits"` TotalMySQLHits int64 `json:"totalMySQLHits"` TotalHits int64 `json:"totalHits"` CacheHits int64 `json:"cacheHits"` CacheMiss int64 `json:"cacheMiss"` StartTime time.Time `json:"startTime"` NextTime time.Time `json:"nextTime"` GroupReport []groupResponse `json:"groupReport"` } type entityHits struct { TotalSMS int64 `json:"totalSMS"` TotalEmail int64 `json:"totalEmail"` TotalOTP int64 `json:"totalOTP"` OTPInCache int64 `json:"otpInCache"` Mutex *sync.Mutex } type dbHits struct { MongoHits int64 `json:"mongoHits"` MySQLHits int64 `json:"mysqlHits"` Mutex *sync.Mutex } type cacheStates struct { totalHits int64 cacheHits int64 cacheMiss int64 cacheHitsMutex *sync.Mutex } // entityHits - manages hits for SMS and EMAILS var entityHitsObj entityHits var clientReponseMutex *sync.Mutex // dbHitsObj - manages hits for Mongo and MySQL var dbHitsObj dbHits var lastKept = 20 var stateCache map[string]Statistic var queryCache map[string]Statistic var stateMutex = &sync.Mutex{} var cacheStatistic *cacheStates var groupCount int64 var groupMutex = &sync.Mutex{} // serverStartTime - server start time var serverStartTime time.Time var clientResponseData clientResponse var initStatus = false func init() { // cacheStatistic = &cacheStates{ // cacheHitsMutex: &sync.Mutex{}, // } // entityHitsObj.Mutex = &sync.Mutex{} // dbHitsObj.Mutex = &sync.Mutex{} // serverStartTime = time.Now() // ba := readStatisticsFile() // unmarshalErr := ffjson.Unmarshal(ba, &clientResponseData) // if errormdl.CheckErr(unmarshalErr) != nil { // loggermdl.LogError(unmarshalErr) // return // } // stateCache = clientResponseData.ServicesState // if stateCache == nil { // stateCache = make(map[string]Statistic) // } // queryCache = clientResponseData.QueryState // if queryCache == nil { // queryCache = make(map[string]Statistic) // } // clientResponseData.NextTime = time.Now().Add(600 * time.Second) // c := cron.New() // c.AddFunc("@every 30s", collectStatistics) // c.Start() } // Init - Init func Init(name string) { projectName = name cacheStatistic = &cacheStates{ cacheHitsMutex: &sync.Mutex{}, } clientReponseMutex = &sync.Mutex{} entityHitsObj.Mutex = &sync.Mutex{} dbHitsObj.Mutex = &sync.Mutex{} serverStartTime = time.Now() ba := readStatisticsFile() unmarshalErr := ffjson.Unmarshal(ba, &clientResponseData) if errormdl.CheckErr(unmarshalErr) != nil { loggermdl.LogError(unmarshalErr) return } stateCache = clientResponseData.ServicesState if stateCache == nil { stateCache = make(map[string]Statistic) } queryCache = clientResponseData.QueryState if queryCache == nil { queryCache = make(map[string]Statistic) } clientResponseData.NextTime = time.Now().Add(600 * time.Second) c := cron.New() c.AddFunc("@every 30s", collectStatistics) c.Start() initStatus = true } func readStatisticsFile() []byte { filePath := getFilePath() if !filemdl.FileAvailabilityCheck(filePath) { return []byte("{}") } ba, readErr := filemdl.ReadFile(filePath) if errormdl.CheckErr(readErr) != nil { loggermdl.LogError(readErr) return []byte("{}") } return ba } func getFilePath() string { if projectName == "" { return "./statistics.json" } return "./statistics/" + projectName + ".json" } func updateGlobalHit() { if !initStatus { return } cacheStatistic.cacheHitsMutex.Lock() updateGroupCache(cacheStatistic.totalHits) cacheStatistic.totalHits++ cacheStatistic.cacheHitsMutex.Unlock() } func updateGroupCache(hitCount int64) { if !initStatus { return } groupMutex.Lock() groupCount++ groupMutex.Unlock() } // UpdateServiceState - update entry of service in state map func UpdateServiceState(serviceName string, servingTime time.Duration, serviceError error, isRestricted, isRoleBased bool) { UpdateServiceStateWithBranch(serviceName, "main", servingTime, serviceError, isRestricted, isRoleBased) } func concatenateNameWithBranch(name, branch string) string { if branch == "" { return name + "_main" } return name + "_" + branch } // UpdateServiceStateWithBranch - update entry of service from a branch in state map func UpdateServiceStateWithBranch(serviceName, branch string, servingTime time.Duration, serviceError error, isRestricted, isRoleBased bool) { if !initStatus { return } stateMutex.Lock() key := concatenateNameWithBranch(serviceName, branch) serviceState, ok := stateCache[key] if !ok { serviceState = Statistic{ ServiceName: serviceName, Name: serviceName, IsRestricted: isRestricted, IsRoleBased: isRoleBased, Branch: branch, } } serviceState.TotalHits++ if serviceError != nil { serviceState.ErrorCount++ serviceState.LastError = serviceError.Error() ct := time.Now() serviceState.ErrorTime = &ct } else { serviceState.TotalTime += servingTime if servingTime > serviceState.MaxTime { serviceState.MaxTime = servingTime } if servingTime < serviceState.MinTime || serviceState.MinTime == 0 { serviceState.MinTime = servingTime } } stateCache[key] = serviceState stateMutex.Unlock() updateGlobalHit() } // UpdateQueryState - update entry of service in state map func UpdateQueryState(queryName string, name string, servingTime time.Duration, serviceError error, isRestricted, isRoleBased bool) { if !initStatus { return } stateMutex.Lock() queryState, ok := queryCache[queryName] if !ok { queryState = Statistic{ ServiceName: queryName, Name: name, IsRestricted: isRestricted, IsRoleBased: isRoleBased, } } queryState.TotalHits++ if serviceError != nil { queryState.ErrorCount++ queryState.LastError = serviceError.Error() ct := time.Now() queryState.ErrorTime = &ct } else { queryState.TotalTime += servingTime if servingTime > queryState.MaxTime { queryState.MaxTime = servingTime } if servingTime < queryState.MinTime || queryState.MinTime == 0 { queryState.MinTime = servingTime } } queryCache[queryName] = queryState stateMutex.Unlock() } // UpdateGlobalServiceCacheState - update only cache hits and miss count for all services func UpdateGlobalServiceCacheState(cacheHit bool) { if !initStatus { return } cacheStatistic.cacheHitsMutex.Lock() defer cacheStatistic.cacheHitsMutex.Unlock() if cacheHit { cacheStatistic.cacheHits++ } else { cacheStatistic.cacheMiss++ } } // EmailHits - update only email hits count for all services func EmailHits() { if !initStatus { return } entityHitsObj.Mutex.Lock() defer entityHitsObj.Mutex.Unlock() entityHitsObj.TotalEmail++ } // OTPHits - update only OTPs hits count for all services -- DPK [12-June-2018] func OTPHits() { if !initStatus { return } entityHitsObj.Mutex.Lock() entityHitsObj.TotalOTP++ entityHitsObj.Mutex.Unlock() } // OTPInCache - update only OTPs hits count for all services -- DPK [12-June-2018] func OTPInCache(count int64) { if !initStatus { return } entityHitsObj.Mutex.Lock() entityHitsObj.OTPInCache = count entityHitsObj.Mutex.Unlock() } // SMSHits - update only sms hits count for all services func SMSHits() { if !initStatus { return } entityHitsObj.Mutex.Lock() defer entityHitsObj.Mutex.Unlock() entityHitsObj.TotalSMS++ } // MongoHits - update only mongo hits count for all services func MongoHits() { if !initStatus { return } dbHitsObj.Mutex.Lock() defer dbHitsObj.Mutex.Unlock() dbHitsObj.MongoHits++ } // MySQLHits - update only MySQL hits count for all services func MySQLHits() { if !initStatus { return } dbHitsObj.Mutex.Lock() defer dbHitsObj.Mutex.Unlock() dbHitsObj.MySQLHits++ } func collectStatistics() { if !initStatus { return } clientReponseMutex.Lock() defer clientReponseMutex.Unlock() clientResponseData.StartTime = serverStartTime cacheStatistic.cacheHitsMutex.Lock() clientResponseData.TotalHits += cacheStatistic.totalHits clientResponseData.CacheHits += cacheStatistic.cacheHits clientResponseData.CacheMiss += cacheStatistic.cacheMiss cacheStatistic.totalHits = 0 cacheStatistic.cacheMiss = 0 cacheStatistic.cacheHits = 0 cacheStatistic.cacheHitsMutex.Unlock() entityHitsObj.Mutex.Lock() clientResponseData.TotalEmail += entityHitsObj.TotalEmail entityHitsObj.TotalEmail = 0 clientResponseData.OTPInCache = entityHitsObj.OTPInCache entityHitsObj.OTPInCache = 0 clientResponseData.TotalOTP += entityHitsObj.TotalOTP entityHitsObj.TotalOTP = 0 clientResponseData.TotalSMS += entityHitsObj.TotalSMS entityHitsObj.TotalSMS = 0 entityHitsObj.Mutex.Unlock() // DB hits collect dbHitsObj.Mutex.Lock() clientResponseData.TotalMongoHits += dbHitsObj.MongoHits dbHitsObj.MongoHits = 0 clientResponseData.TotalMySQLHits += dbHitsObj.MySQLHits dbHitsObj.MySQLHits = 0 dbHitsObj.Mutex.Unlock() groupMutex.Lock() current := time.Now() if current.After(clientResponseData.NextTime) || len(clientResponseData.GroupReport) == 0 { gr := groupResponse{} gr.GroupTime = current.String() gr.Hits = groupCount clientResponseData.GroupReport = append(clientResponseData.GroupReport, gr) clientResponseData.NextTime = time.Now().Add(30 * time.Second) } else { clientResponseData.GroupReport[len(clientResponseData.GroupReport)-1].Hits += groupCount } length := len(clientResponseData.GroupReport) if length > lastKept { clientResponseData.GroupReport = clientResponseData.GroupReport[length-lastKept:] } groupCount = 0 groupMutex.Unlock() clientResponseData.ServicesState = stateCache clientResponseData.QueryState = queryCache // the marshall function reads data from clientResponseData.ServicesState and clientResponseData.QueryState // as both are maps, we have passed a referrence of stateCache and queryCache respectively. // The following lock guards these two underlying data structures. stateMutex.Lock() defer stateMutex.Unlock() ba, marshalError := ffjson.Marshal(clientResponseData) if errormdl.CheckErr(marshalError) != nil { loggermdl.LogError(marshalError) return } writeErr := filemdl.WriteFile(getFilePath(), ba, true, false) if errormdl.CheckErr(writeErr) != nil { loggermdl.LogError(writeErr) } }