修改:1、aliyun上报服务中节点上报增加了lock锁,临时解决上报问题

This commit is contained in:
pengwang 2021-02-22 20:42:05 +08:00
parent 9e9becea8f
commit 874436ad50
3 changed files with 121 additions and 56 deletions

View File

@ -257,7 +257,7 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
}
}
tc := time.Since(startT) //计算耗时
setting.Logger.Debugf("time cost = %v\n", tc)
setting.Logger.Debugf("%v: ,time cost = %v\n", c.CollInterface.CollInterfaceName, tc)
//更新设备在线数量
c.CollInterface.DeviceNodeOnlineCnt = 0
@ -297,7 +297,7 @@ func (c *CommunicationManageTemplate) CommunicationManageDel() {
select {
case cmd := <-c.CommonRequestChan:
{
setting.Logger.Debugf("commChanLen %v\n", len(c.CommonRequestChan))
setting.Logger.Debugf("%v:,commChanLen %v\n", c.CollInterface.CollInterfaceName, len(c.CommonRequestChan))
c.CommunicationStateMachine(cmd)
GetDeviceOnline()

View File

@ -33,7 +33,7 @@ func apiAddNetwork(context *gin.Context) {
Name: networkParam.Name,
DHCP: networkParam.DHCP,
IP: networkParam.IP,
Netmask: networkParam.DHCP,
Netmask: networkParam.Netmask,
Gateway: networkParam.Gateway,
}

View File

@ -12,6 +12,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
@ -86,6 +87,8 @@ var ReportServiceParamListAliyun = &ReportServiceParamListAliyunTemplate{
ServiceList: make([]*ReportServiceParamAliyunTemplate, 0),
}
var lock sync.Mutex
func init() {
ReportServiceParamListAliyun.ReadParamFromJson()
@ -284,7 +287,7 @@ func (r *ReportServiceParamAliyunTemplate) NodeLogin(name []string) bool {
}
sendMessage.DeviceName = append(sendMessage.DeviceName, v.Param.DeviceName)
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapPre %v", r.GWParam.ServiceName, r.SendMessageMap)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
//超时3s
time.AfterFunc(5*time.Second, func() {
for i, s := range r.SendMessageMap {
@ -407,8 +410,7 @@ func (r *ReportServiceParamAliyunTemplate) GWPropertyPost() {
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunGWPropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, valueMap)
MsgId = MsgId - 1
MsgId := MQTTAliyunGWPropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, valueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
@ -416,6 +418,7 @@ func (r *ReportServiceParamAliyunTemplate) GWPropertyPost() {
}
sendMessage.DeviceName = append(sendMessage.DeviceName, r.GWParam.Param.DeviceName)
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
//上报故障先加收到正确回应后清0
r.GWParam.ReportErrCnt++
setting.Logger.Debugf("service %s,gw ReportErrCnt %d", r.GWParam.Param.DeviceName, r.GWParam.ReportErrCnt)
@ -485,7 +488,7 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
} else { //最后一页
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
@ -524,7 +527,7 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
//setting.Logger.Debugf("NodeValueMap %v", NodeValueMap)
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap)
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
@ -534,7 +537,7 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapNodes %v", r.GWParam.ServiceName, r.SendMessageMap)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}
@ -542,18 +545,35 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
//指定设备上传属性
func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(name []string) {
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
nodeList := make([]ReportServiceNodeParamAliyunTemplate, 0)
for _, n := range name {
for k, v := range r.NodeList {
if n == v.Name {
nodeList = append(nodeList, v)
//上报故障计数值先加收到正确回应后清0
r.NodeList[k].ReportErrCnt++
}
}
}
for _, a := range name {
for k, n := range r.NodeList {
if a == n.Name {
deviceName := n.Param.DeviceName
productKey := n.Param.ProductKey
pageCnt := len(nodeList) / 20 //单包最大发送20个设备
if len(nodeList)%20 != 0 {
pageCnt += 1
}
//log.Printf("pageCnt %v\n", pageCnt)
for pageIndex := 0; pageIndex < pageCnt; pageIndex++ {
//log.Printf("pageIndex %v\n", pageIndex)
if pageIndex != (pageCnt - 1) {
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := nodeList[20*pageIndex : 20*pageIndex+20]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if n.CollInterfaceName == c.CollInterfaceName {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if a == d.Name {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
@ -563,43 +583,87 @@ func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(name []string) {
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = productKey
NodeValue.DeviceName = deviceName
NodeValueMap = append(NodeValueMap, NodeValue)
//上报故障计数值先加收到正确回应后清0
r.NodeList[k].ReportErrCnt++
setting.Logger.Debugf("service %s,%s ReportErrCnt %d", r.GWParam.Param.DeviceName, n.Name, r.NodeList[k].ReportErrCnt)
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range NodeValueMap {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
} else { //最后一页
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := nodeList[20*pageIndex : len(nodeList)]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
//setting.Logger.Debugf("NodeValueMap %v", NodeValueMap)
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range NodeValueMap {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
//if len(valueMap) > 0 {
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range NodeValueMap {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapNode %v", r.GWParam.ServiceName, r.SendMessageMap)
}
func (r *ReportServiceParamAliyunTemplate) PropertyPost() {
@ -718,7 +782,7 @@ func ReportServiceAliyunProcessMessage(r *ReportServiceParamAliyunTemplate, topi
Code: property.Code,
ID: property.ID,
}
lock.Lock()
for k, v := range r.SendMessageMap {
if v.ID == ackMessage.ID {
for _, name := range v.DeviceName {
@ -726,22 +790,23 @@ func ReportServiceAliyunProcessMessage(r *ReportServiceParamAliyunTemplate, topi
r.GWParam.ReportStatus = "onLine"
r.GWParam.ReportErrCnt = 0
setting.Logger.Infof("service:%s,gw online", r.GWParam.ServiceName)
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
} else { //末端设备
for i, n := range r.NodeList {
if name == n.Param.DeviceName {
r.NodeList[i].ReportStatus = "onLine"
r.NodeList[i].ReportErrCnt = 0
setting.Logger.Infof("service:%s,%s online", r.GWParam.ServiceName, n.Param.DeviceName)
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}
}
setting.Logger.Debugf("service:%s,sendMessageMapPre %v", r.GWParam.ServiceName, r.SendMessageMap)
setting.Logger.Debugf("k:%v,id:%v", k, v.ID)
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
lock.Unlock()
}
} else if strings.Contains(topic, "/combine/batch_login_reply") { //子设备上线回应
type MQTTAliyunLogInDataTemplate struct {
@ -769,7 +834,6 @@ func ReportServiceAliyunProcessMessage(r *ReportServiceParamAliyunTemplate, topi
Code: property.Code,
ID: property.ID,
}
//setting.Logger.Debugf("service:%s,sendMessageMapPre %v", r.GWParam.ServiceName, r.SendMessageMap)
for k, v := range r.SendMessageMap {
if v.ID == ackMessage.ID {
for _, name := range v.DeviceName {
@ -777,11 +841,12 @@ func ReportServiceAliyunProcessMessage(r *ReportServiceParamAliyunTemplate, topi
if name == n.Param.DeviceName {
r.NodeList[i].ReportStatus = "onLine"
r.NodeList[i].ReportErrCnt = 0
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}