优化:1、上报服务先判断网关通信是否成功,再进行节点上报 2、数据上报时判断返回是否成功,增加超时判断

This commit is contained in:
pengwang 2020-12-03 16:39:22 +08:00
parent d60fd6c0fa
commit 7e0ad9247a
2 changed files with 70 additions and 70 deletions

33
main.go
View File

@ -3,18 +3,12 @@ package main
import (
"fmt"
"github.com/robfig/cron"
"goAdapter/report"
"golang.org/x/sync/errgroup"
"goAdapter/device"
"goAdapter/httpServer"
"goAdapter/report"
"goAdapter/setting"
)
var (
g errgroup.Group
)
func main() {
@ -47,7 +41,7 @@ func main() {
// 定义一个cron运行器
cronProcess := cron.New()
// 定时5秒每5秒执行print5
cronProcess.AddFunc("*/5 * * * * *", setting.NetworkParamList.GetNetworkParam)
_ = cronProcess.AddFunc("*/5 * * * * *", setting.NetworkParamList.GetNetworkParam)
// 定时
for k, v := range device.CollectInterfaceMap {
@ -56,16 +50,16 @@ func main() {
str := fmt.Sprintf("@every %dm%ds", v.PollPeriod/60, v.PollPeriod%60)
setting.Logger.Infof("str %+v", str)
cronProcess.AddFunc(str, device.CommunicationManage[k].CommunicationManagePoll)
_ = cronProcess.AddFunc(str, device.CommunicationManage[k].CommunicationManagePoll)
go device.CommunicationManage[k].CommunicationManageDel()
}
// 定时60秒,定时获取系统信息
cronProcess.AddFunc("*/60 * * * * *", setting.CollectSystemParam)
_ = cronProcess.AddFunc("*/60 * * * * *", setting.CollectSystemParam)
// 每天0点,定时获取NTP服务器的时间并校时
cronProcess.AddFunc("0 0 0 * * ?", func(){
_ = cronProcess.AddFunc("0 0 0 * * ?", func(){
setting.NTPGetTime()
})
@ -80,22 +74,5 @@ func main() {
report.ReportServiceInit()
/**************httpserver初始化****************/
// 默认启动方式,包含 Logger、Recovery 中间件
//serverWeb := &http.Server{
// Addr: ":8080",
// Handler: httpServer.RouterWeb(),
// ReadTimeout: 5 * time.Second,
// WriteTimeout: 10 * time.Second,
//}
//
//g.Go(func() error {
// return serverWeb.ListenAndServe()
//})
//
//if err := g.Wait(); err != nil {
// setting.Logger.Fatal(err)
//}
httpServer.RouterWeb()
}

View File

@ -220,9 +220,10 @@ func (r *ReportServiceParamAliyunTemplate) GWLogin() bool {
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
_, r.GWParam.MQTTClient = mqttClient.MQTTAliyunGWLogin(mqttAliyunRegister, GWPublishHandler)
status := false
status, r.GWParam.MQTTClient = mqttClient.MQTTAliyunGWLogin(mqttAliyunRegister, GWPublishHandler)
return true
return status
}
func (r *ReportServiceParamAliyunTemplate) NodeLogin(addr []string) bool {
@ -251,6 +252,7 @@ func (r *ReportServiceParamAliyunTemplate) NodeLogin(addr []string) bool {
}
mqttClient.MQTTAliyunNodeLoginIn(r.GWParam.MQTTClient, mqttAliyunRegister, nodeList)
timerOut := time.NewTimer(500 * time.Millisecond)
select {
case ackMessage := <-r.MessageChan:
if strings.Contains(ackMessage.Topic, "/combine/batch_login_reply") {
@ -258,7 +260,8 @@ func (r *ReportServiceParamAliyunTemplate) NodeLogin(addr []string) bool {
} else {
log.Printf("Node combine/login err")
}
default:
case <-timerOut.C:
timerOut.Stop()
log.Printf("Node combine/login err")
}
@ -292,6 +295,7 @@ func (r *ReportServiceParamAliyunTemplate) NodeLogOut(addr []string) bool {
}
mqttClient.MQTTAliyunNodeLoginOut(r.GWParam.MQTTClient, mqttAliyunRegister, nodeList)
timerOut := time.NewTimer(500 * time.Millisecond)
select {
case ackMessage := <-r.MessageChan:
if strings.Contains(ackMessage.Topic, "/combine/batch_logout_reply") {
@ -299,7 +303,8 @@ func (r *ReportServiceParamAliyunTemplate) NodeLogOut(addr []string) bool {
} else {
log.Printf("Node combine/logout err")
}
default:
case <-timerOut.C:
timerOut.Stop()
log.Printf("Node combine/logout err")
}
@ -370,6 +375,7 @@ func (r *ReportServiceParamAliyunTemplate) GWPropertyPost() {
mqttClient.MQTTAliyunGWPropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, valueMap)
timerOut := time.NewTimer(500 * time.Millisecond)
select {
case ackMessage := <-r.MessageChan:
if strings.Contains(ackMessage.Topic, "/thing/event/property/pack/post_reply") {
@ -377,7 +383,8 @@ func (r *ReportServiceParamAliyunTemplate) GWPropertyPost() {
} else {
log.Printf("gw property post err")
}
default:
case <-timerOut.C:
timerOut.Stop()
log.Printf("gw property post err")
}
@ -423,6 +430,7 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
mqttClient.MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap)
timerOut := time.NewTimer(500 * time.Millisecond)
select {
case ackMessage := <-r.MessageChan:
if strings.Contains(ackMessage.Topic, "/thing/event/property/pack/post_reply") {
@ -441,7 +449,8 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
}
}
}
default:
case <-timerOut.C:
timerOut.Stop()
log.Printf("node property post err")
}
}
@ -494,6 +503,7 @@ func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(addr []string) {
mqttClient.MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap)
timerOut := time.NewTimer(500 * time.Millisecond)
select {
case ackMessage := <-r.MessageChan:
if strings.Contains(ackMessage.Topic, "/thing/event/property/pack/post_reply") {
@ -520,66 +530,79 @@ func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(addr []string) {
}
}
}
default:
case <-timerOut.C:
timerOut.Stop()
log.Printf("node property post err")
}
}
func ReportServiceAliyunPoll(r *ReportServiceParamAliyunTemplate) {
reportState := 0
// 定义一个cron运行器
cronProcess := cron.New()
str := fmt.Sprintf("@every %dm%ds", r.GWParam.ReportTime/60, r.GWParam.ReportTime%60)
setting.Logger.Infof("reportServiceAliyun %+v", str)
cronProcess.AddFunc(str, r.GWPropertyPost)
cronProcess.AddFunc(str, r.AllNodePropertyPost)
cronProcess.Start()
defer cronProcess.Stop()
addr := make([]string, 0)
r.GWLogin()
for {
switch reportState {
case 0:
{
if r.GWLogin() == true {
reportState = 1
//节点发生了上线
for _, c := range device.CollectInterfaceMap {
for i := 0; i < len(c.OnlineReportChan); i++ {
addr = append(addr, <-c.OnlineReportChan)
cronProcess.AddFunc(str, r.GWPropertyPost)
cronProcess.AddFunc(str, r.AllNodePropertyPost)
} else {
time.Sleep(5 * time.Second)
}
}
}
if len(addr) > 0 {
log.Printf("DeviceOnline %v\n", addr)
r.NodeLogin(addr)
addr = addr[0:0]
}
case 1:
{
//节点发生了上线
for _, c := range device.CollectInterfaceMap {
for i := 0; i < len(c.OnlineReportChan); i++ {
addr = append(addr, <-c.OnlineReportChan)
}
}
if len(addr) > 0 {
log.Printf("DeviceOnline %v\n", addr)
r.NodeLogin(addr)
addr = addr[0:0]
}
//节点发生了离线
for _, c := range device.CollectInterfaceMap {
for i := 0; i < len(c.OfflineReportChan); i++ {
addr = append(addr, <-c.OfflineReportChan)
}
}
if len(addr) > 0 {
log.Printf("DeviceOffline %v\n", addr)
r.NodeLogOut(addr)
addr = addr[0:0]
}
//节点发生了离线
for _, c := range device.CollectInterfaceMap {
for i := 0; i < len(c.OfflineReportChan); i++ {
addr = append(addr, <-c.OfflineReportChan)
}
}
if len(addr) > 0 {
log.Printf("DeviceOffline %v\n", addr)
r.NodeLogOut(addr)
addr = addr[0:0]
}
//节点有属性变化
for _, c := range device.CollectInterfaceMap {
for i := 0; i < len(c.PropertyReportChan); i++ {
addr = append(addr, <-c.PropertyReportChan)
//节点有属性变化
for _, c := range device.CollectInterfaceMap {
for i := 0; i < len(c.PropertyReportChan); i++ {
addr = append(addr, <-c.PropertyReportChan)
}
}
if len(addr) > 0 {
log.Printf("DevicePropertyChanged %v\n", addr)
r.NodePropertyPost(addr)
addr = addr[0:0]
}
}
}
if len(addr) > 0 {
log.Printf("DevicePropertyChanged %v\n", addr)
r.NodePropertyPost(addr)
addr = addr[0:0]
}
time.Sleep(100 * time.Millisecond)
}