修改:

1、修改Aliyun上报服务,采用协程方式处理,将上行和下行分开
This commit is contained in:
pengwang 2021-03-31 22:41:09 +08:00
parent 418cab1407
commit 37763a6e6c
9 changed files with 770 additions and 766 deletions

View File

@ -2,11 +2,12 @@ package main
import (
"fmt"
"github.com/robfig/cron"
"goAdapter/device"
"goAdapter/httpServer"
"goAdapter/report"
"goAdapter/setting"
"github.com/robfig/cron"
)
func main() {

View File

@ -1,21 +1,12 @@
package mqttAliyun
import (
"bytes"
"encoding/json"
MQTT "github.com/eclipse/paho.mqtt.golang"
"goAdapter/setting"
"strconv"
"time"
)
type MQTTAliyunRegisterTemplate struct {
RemoteIP string
RemotePort string
ProductKey string `json:"ProductKey"`
DeviceName string `json:"DeviceName"`
DeviceSecret string `json:"DeviceSecret"`
}
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type MQTTAliyunNodeRegisterTemplate struct {
ProductKey string `json:"ProductKey"`
@ -63,66 +54,6 @@ func init() {
}
func MQTTAliyunGWLogin(param MQTTAliyunRegisterTemplate, publishHandler MQTT.MessageHandler) (bool, MQTT.Client) {
var raw_broker bytes.Buffer
//MQTT.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)
raw_broker.WriteString(param.ProductKey)
raw_broker.WriteString(param.RemoteIP)
opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
auth := MqttClient_CalculateSign(param.ProductKey,
param.DeviceName,
param.DeviceSecret, timeStamp)
opts.SetClientID(auth.mqttClientId)
opts.SetUsername(auth.username)
opts.SetPassword(auth.password)
opts.SetKeepAlive(60 * 2 * time.Second)
opts.SetDefaultPublishHandler(publishHandler)
opts.SetAutoReconnect(false)
// create and start a client using the above ClientOptions
mqttClient := MQTT.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
setting.Logger.Errorf("Connect aliyun IoT Cloud fail,%s", token.Error())
return false, nil
}
setting.Logger.Info("Connect aliyun IoT Cloud Sucess")
subTopic := ""
//属性上报回应
subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/event/property/pack/post_reply"
MQTTAliyunSubscribeTopic(mqttClient, subTopic)
//属性设置
subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/service/property/set"
MQTTAliyunSubscribeTopic(mqttClient, subTopic)
//服务调用(服务不需要主动订阅,平台自动订阅)
//subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/service/RemoteCmdOpen"
//MQTTAliyunSubscribeTopic(mqttClient, subTopic)
//子设备注册
subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/sub/register_reply"
MQTTAliyunSubscribeTopic(mqttClient, subTopic)
return true, mqttClient
//MQTTClient_AddTopo()
//MQTTClient_Register()
}
func MQTTAliyunSubscribeTopic(client MQTT.Client, topic string) {
if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
setting.Logger.Warningf("Subscribe topic %s fail,%v", topic, token.Error())
}
setting.Logger.Info("Subscribe topic " + topic + " success")
}
func MQTTAliyunNodeLoginIn(client MQTT.Client, gw MQTTAliyunRegisterTemplate, node []MQTTAliyunNodeRegisterTemplate) int {
type NodeParamsTemplate struct {

View File

@ -0,0 +1,137 @@
package mqttAliyun
import (
"bytes"
"goAdapter/setting"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type MQTTAliyunRegisterTemplate struct {
RemoteIP string
RemotePort string
ProductKey string `json:"ProductKey"`
DeviceName string `json:"DeviceName"`
DeviceSecret string `json:"DeviceSecret"`
}
func MQTTAliyunGWLogin(param MQTTAliyunRegisterTemplate, publishHandler MQTT.MessageHandler) (bool, MQTT.Client) {
var raw_broker bytes.Buffer
//MQTT.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)
raw_broker.WriteString(param.ProductKey)
raw_broker.WriteString(param.RemoteIP)
opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
auth := MqttClient_CalculateSign(param.ProductKey,
param.DeviceName,
param.DeviceSecret, timeStamp)
opts.SetClientID(auth.mqttClientId)
opts.SetUsername(auth.username)
opts.SetPassword(auth.password)
opts.SetKeepAlive(60 * 2 * time.Second)
opts.SetDefaultPublishHandler(publishHandler)
opts.SetAutoReconnect(false)
// create and start a client using the above ClientOptions
mqttClient := MQTT.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
setting.Logger.Errorf("Connect aliyun IoT Cloud fail,%s", token.Error())
return false, nil
}
setting.Logger.Info("Connect aliyun IoT Cloud Sucess")
subTopic := ""
//属性上报回应
subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/event/property/pack/post_reply"
MQTTAliyunSubscribeTopic(mqttClient, subTopic)
//属性设置
subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/service/property/set"
MQTTAliyunSubscribeTopic(mqttClient, subTopic)
//服务调用(服务不需要主动订阅,平台自动订阅)
//subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/service/RemoteCmdOpen"
//MQTTAliyunSubscribeTopic(mqttClient, subTopic)
//子设备注册
subTopic = "/sys/" + param.ProductKey + "/" + param.DeviceName + "/thing/sub/register_reply"
MQTTAliyunSubscribeTopic(mqttClient, subTopic)
return true, mqttClient
//MQTTClient_AddTopo()
//MQTTClient_Register()
}
func MQTTAliyunSubscribeTopic(client MQTT.Client, topic string) {
if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
setting.Logger.Warningf("Subscribe topic %s fail,%v", topic, token.Error())
}
setting.Logger.Info("Subscribe topic " + topic + " success")
}
func (r *ReportServiceParamAliyunTemplate) GWLogin() bool {
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
status := false
status, r.GWParam.MQTTClient = MQTTAliyunGWLogin(mqttAliyunRegister, ReceiveMessageHandler)
if status == true {
r.GWParam.ReportStatus = "onLine"
}
return status
}
func (r *ReportServiceParamAliyunTemplate) NodeLogin(name []string) bool {
nodeList := make([]MQTTAliyunNodeRegisterTemplate, 0)
nodeParam := MQTTAliyunNodeRegisterTemplate{}
status := false
setting.Logger.Debugf("nodeLoginName %v", name)
for _, d := range name {
for k, v := range r.NodeList {
if d == v.Name {
nodeParam.DeviceSecret = v.Param.DeviceSecret
nodeParam.DeviceName = v.Param.DeviceName
nodeParam.ProductKey = v.Param.ProductKey
nodeList = append(nodeList, nodeParam)
r.NodeList[k].CommStatus = "onLine"
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MQTTAliyunNodeLoginIn(r.GWParam.MQTTClient, mqttAliyunRegister, nodeList)
select {
case <-r.ReceiveLogInAckFrameChan:
{
status = true
}
case <-time.After(time.Millisecond * 2000):
{
status = false
}
}
}
}
}
return status
}

View File

@ -0,0 +1,54 @@
package mqttAliyun
import (
"goAdapter/setting"
"time"
)
func (r *ReportServiceParamAliyunTemplate) NodeLogOut(name []string) bool {
status := false
nodeList := make([]MQTTAliyunNodeRegisterTemplate, 0)
nodeParam := MQTTAliyunNodeRegisterTemplate{}
for _, d := range name {
for k, v := range r.NodeList {
if d == v.Name {
if v.ReportStatus == "offLine" {
setting.Logger.Infof("service:%s,%s is already offLine", r.GWParam.ServiceName, v.Name)
} else {
nodeParam.DeviceSecret = v.Param.DeviceSecret
nodeParam.DeviceName = v.Param.DeviceName
nodeParam.ProductKey = v.Param.ProductKey
nodeList = append(nodeList, nodeParam)
r.NodeList[k].CommStatus = "offLine"
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MQTTAliyunNodeLoginOut(r.GWParam.MQTTClient, mqttAliyunRegister, nodeList)
select {
case frame := <-r.ReceiveLogOutAckFrameChan:
{
if frame.Code == 200 {
}
status = true
}
case <-time.After(time.Millisecond * 2000):
{
status = false
}
}
}
}
}
}
return status
}

View File

@ -0,0 +1,45 @@
package mqttAliyun
import (
"encoding/json"
"goAdapter/device"
"strings"
)
func ReportServiceAliyunProcessGetSubDeviceProperty(r *ReportServiceParamAliyunTemplate, message MQTTAliyunMessageTemplate,
gw MQTTAliyunRegisterTemplate, cmdName string) {
addrArray := strings.Split(message.Params["Addr"].(string), ",")
for _, v := range addrArray {
for _, n := range r.NodeList {
if v == n.Param.DeviceName {
cmd := device.CommunicationCmdTemplate{}
cmd.CollInterfaceName = "coll1"
cmd.DeviceName = n.Addr
cmd.FunName = "GetRealVariables"
paramStr, _ := json.Marshal(message.Params)
cmd.FunPara = string(paramStr)
if len(device.CommunicationManage) > 0 {
if device.CommunicationManage[0].CommunicationManageAddEmergency(cmd) == true {
payload := MQTTAliyunThingServiceAckTemplate{
Identifier: cmdName,
ID: message.ID,
Code: 200,
Data: make(map[string]interface{}),
}
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, gw, payload)
} else {
payload := MQTTAliyunThingServiceAckTemplate{
Identifier: cmdName,
ID: message.ID,
Code: 1000,
Data: make(map[string]interface{}),
}
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, gw, payload)
}
}
}
}
}
}

View File

@ -0,0 +1,76 @@
package mqttAliyun
import (
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type MQTTAliyunReceiveFrameTemplate struct {
Topic string
Payload []byte
}
type MQTTAliyunLogInDataTemplate struct {
ProductKey string `json:"productKey"`
DeviceName string `json:"deviceName"`
}
type MQTTAliyunLogInAckTemplate struct {
ID string `json:"id"`
Code int32 `json:"code"`
Message string `json:"message"`
Data []MQTTAliyunLogInDataTemplate `json:"data"`
}
type MQTTAliyunLogOutDataTemplate struct {
Code int32 `json:"code"`
Message string `json:"message"`
ProductKey string `json:"productKey"`
DeviceName string `json:"deviceName"`
}
type MQTTAliyunLogOutAckTemplate struct {
ID string `json:"id"`
Code int32 `json:"code"`
Message string `json:"message"`
Data MQTTAliyunLogOutDataTemplate `json:"data"`
}
type MQTTAliyunReportPropertyAckTemplate struct {
Code int32 `json:"code"`
Data string `json:"-"`
ID string `json:"id"`
Message string `json:"message"`
Method string `json:"method"`
Version string `json:"version"`
}
//发送数据回调函数
func ReceiveMessageHandler(client MQTT.Client, msg MQTT.Message) {
for _, v := range ReportServiceParamListAliyun.ServiceList {
if v.GWParam.MQTTClient == client {
receiveFrame := MQTTAliyunReceiveFrameTemplate{
Topic: msg.Topic(),
Payload: msg.Payload(),
}
v.ReceiveFrameChan <- receiveFrame
}
}
}
//func ProcessPropertyPost(r *ReportServiceParamAliyunTemplate) {
//
// for {
// select {
// case postParam := <-r.PropertyPostChan:
// {
// setting.Logger.Tracef("service %s,postParam %v,postChanCnt %v", r.GWParam.ServiceName, postParam, len(r.PropertyPostChan))
// if postParam.DeviceType == 0 { //网关上报
// r.GWPropertyPost()
// } else if postParam.DeviceType == 1 { //末端设备上报
// r.NodePropertyPost(postParam.DeviceName)
// }
// }
// }
// }
//}

View File

@ -0,0 +1,338 @@
package mqttAliyun
import (
"goAdapter/device"
"goAdapter/setting"
"time"
)
func (r *ReportServiceParamAliyunTemplate) GWPropertyPost() {
valueMap := make([]MQTTAliyunValueTemplate, 0)
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = "MemTotal"
mqttAliyunValue.Value = setting.SystemState.MemTotal
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "MemUse"
mqttAliyunValue.Value = setting.SystemState.MemUse
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DiskTotal"
mqttAliyunValue.Value = setting.SystemState.DiskTotal
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DiskUse"
mqttAliyunValue.Value = setting.SystemState.DiskUse
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "Name"
mqttAliyunValue.Value = setting.SystemState.Name
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "SN"
mqttAliyunValue.Value = setting.SystemState.SN
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "HardVer"
mqttAliyunValue.Value = setting.SystemState.HardVer
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "SoftVer"
mqttAliyunValue.Value = setting.SystemState.SoftVer
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "SystemRTC"
mqttAliyunValue.Value = setting.SystemState.SystemRTC
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "RunTime"
mqttAliyunValue.Value = setting.SystemState.RunTime
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DeviceOnline"
mqttAliyunValue.Value = setting.SystemState.DeviceOnline
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DevicePacketLoss"
mqttAliyunValue.Value = setting.SystemState.DevicePacketLoss
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
//上报故障先加收到正确回应后清0
r.GWParam.ReportErrCnt++
setting.Logger.Debugf("service %s,gw ReportErrCnt %d", r.GWParam.Param.DeviceName, r.GWParam.ReportErrCnt)
MQTTAliyunGWPropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, valueMap)
select {
case frame := <-r.ReceiveReportNodePropertyAckFrameChan:
{
if frame.Code == 200 {
}
}
case <-time.After(time.Millisecond * 2000):
{
}
}
}
func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
//上报故障计数值先加收到正确回应后清0
for i := 0; i < len(r.NodeList); i++ {
r.NodeList[i].ReportErrCnt++
}
pageCnt := len(r.NodeList) / 20 //单包最大发送20个设备
if len(r.NodeList)%20 != 0 {
pageCnt += 1
}
//log.Printf("pageCnt %v\n", pageCnt)
for pageIndex := 0; pageIndex < pageCnt; pageIndex++ {
//log.Printf("pageIndex %v\n", pageIndex)
if pageIndex != (pageCnt - 1) {
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := r.NodeList[20*pageIndex : 20*pageIndex+20]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap)
select {
case frame := <-r.ReceiveReportNodePropertyAckFrameChan:
{
if frame.Code == 200 {
}
}
case <-time.After(time.Millisecond * 1000):
{
}
}
} else { //最后一页
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := r.NodeList[20*pageIndex : len(r.NodeList)]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
//setting.Logger.Debugf("NodeValueMap %v", NodeValueMap)
MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap)
select {
case frame := <-r.ReceiveReportNodePropertyAckFrameChan:
{
if frame.Code == 200 {
}
}
case <-time.After(time.Millisecond * 1000):
{
}
}
}
}
}
//指定设备上传属性
func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(name []string) {
nodeList := make([]ReportServiceNodeParamAliyunTemplate, 0)
for _, n := range name {
for k, v := range r.NodeList {
if n == v.Name {
nodeList = append(nodeList, v)
//上报故障计数值先加收到正确回应后清0
r.NodeList[k].ReportErrCnt++
}
}
}
pageCnt := len(nodeList) / 20 //单包最大发送20个设备
if len(nodeList)%20 != 0 {
pageCnt += 1
}
//log.Printf("pageCnt %v\n", pageCnt)
for pageIndex := 0; pageIndex < pageCnt; pageIndex++ {
//log.Printf("pageIndex %v\n", pageIndex)
if pageIndex != (pageCnt - 1) {
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := nodeList[20*pageIndex : 20*pageIndex+20]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap)
select {
case frame := <-r.ReceiveReportNodePropertyAckFrameChan:
{
if frame.Code == 200 {
}
}
case <-time.After(time.Millisecond * 1000):
{
}
}
} else { //最后一页
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := nodeList[20*pageIndex : len(nodeList)]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
//setting.Logger.Debugf("NodeValueMap %v", NodeValueMap)
MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap)
select {
case frame := <-r.ReceiveReportNodePropertyAckFrameChan:
{
if frame.Code == 200 {
}
}
case <-time.After(time.Millisecond * 1000):
{
}
}
}
}
}

View File

@ -0,0 +1 @@
package mqttAliyun

View File

@ -3,17 +3,16 @@ package mqttAliyun
import (
"encoding/json"
"fmt"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/robfig/cron"
"goAdapter/device"
"goAdapter/setting"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/robfig/cron"
)
//阿里云上报节点参数结构体
@ -50,32 +49,19 @@ type ReportServiceGWParamAliyunTemplate struct {
MQTTClient MQTT.Client `json:"-"`
}
//阿里云上报服务接收报文结构体
type ReportServiceReceiveMessageAliyunTemplate struct {
Topic string
Code int32
ID string
}
//阿里云上报服务发送报文结构体
type ReportServiceSendMessageAliyunTemplate struct {
Topic string
DeviceName []string //发送时会存在多个设备在一条报文里
ID string
}
type ReportServicePropertyPostAliyunTemplate struct {
DeviceType int
DeviceName []string
}
//阿里云上报服务参数,网关参数,节点参数
type ReportServiceParamAliyunTemplate struct {
GWParam ReportServiceGWParamAliyunTemplate
NodeList []ReportServiceNodeParamAliyunTemplate
ReceiveMessageMap []ReportServiceReceiveMessageAliyunTemplate `json:"-"`
SendMessageMap []ReportServiceSendMessageAliyunTemplate `json:"-"`
PropertyPostChan chan ReportServicePropertyPostAliyunTemplate `json:"-"`
GWParam ReportServiceGWParamAliyunTemplate
NodeList []ReportServiceNodeParamAliyunTemplate
ReceiveFrameChan chan MQTTAliyunReceiveFrameTemplate `json:"-"`
LogInRequestFrameChan chan []string `json:"-"` //上线
ReceiveLogInAckFrameChan chan MQTTAliyunLogInAckTemplate `json:"-"`
LogOutRequestFrameChan chan []string `json:"-"`
ReceiveLogOutAckFrameChan chan MQTTAliyunLogOutAckTemplate `json:"-"`
ReportGWPropertyRequestFrameChan chan bool `json:"-"`
ReceiveReportGWPropertyAckFrameChan chan MQTTAliyunReportPropertyAckTemplate `json:"-"`
ReportNodePropertyRequestFrameChan chan []string `json:"-"`
ReceiveReportNodePropertyAckFrameChan chan MQTTAliyunReportPropertyAckTemplate `json:"-"`
}
type ReportServiceParamListAliyunTemplate struct {
@ -87,20 +73,23 @@ var ReportServiceParamListAliyun = &ReportServiceParamListAliyunTemplate{
ServiceList: make([]*ReportServiceParamAliyunTemplate, 0),
}
var lock sync.Mutex
func init() {
ReportServiceParamListAliyun.ReadParamFromJson()
//初始化
for _, v := range ReportServiceParamListAliyun.ServiceList {
v.ReceiveMessageMap = make([]ReportServiceReceiveMessageAliyunTemplate, 0)
v.SendMessageMap = make([]ReportServiceSendMessageAliyunTemplate, 0)
v.PropertyPostChan = make(chan ReportServicePropertyPostAliyunTemplate, 10)
v.ReceiveFrameChan = make(chan MQTTAliyunReceiveFrameTemplate, 100)
v.LogInRequestFrameChan = make(chan []string, 0)
v.ReceiveLogInAckFrameChan = make(chan MQTTAliyunLogInAckTemplate, 5)
v.LogOutRequestFrameChan = make(chan []string, 0)
v.ReceiveLogOutAckFrameChan = make(chan MQTTAliyunLogOutAckTemplate, 5)
v.ReportGWPropertyRequestFrameChan = make(chan bool, 50)
v.ReceiveReportGWPropertyAckFrameChan = make(chan MQTTAliyunReportPropertyAckTemplate, 50)
v.ReportNodePropertyRequestFrameChan = make(chan []string, 50)
v.ReceiveReportNodePropertyAckFrameChan = make(chan MQTTAliyunReportPropertyAckTemplate, 50)
go ReportServiceAliyunPoll(v)
go ProcessPropertyPost(v)
}
}
@ -228,516 +217,74 @@ func (r *ReportServiceParamAliyunTemplate) DeleteReportNode(name string) int {
return index
}
//发送数据回调函数
func GWPublishHandler(client MQTT.Client, msg MQTT.Message) {
for _, v := range ReportServiceParamListAliyun.ServiceList {
if v.GWParam.MQTTClient == client {
go ReportServiceAliyunProcessMessage(v, msg.Topic(), msg.Payload())
}
}
}
func (r *ReportServiceParamAliyunTemplate) GWLogin() bool {
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
status := false
status, r.GWParam.MQTTClient = MQTTAliyunGWLogin(mqttAliyunRegister, GWPublishHandler)
if status == true {
r.GWParam.ReportStatus = "onLine"
}
return status
}
func (r *ReportServiceParamAliyunTemplate) NodeLogin(name []string) bool {
nodeList := make([]MQTTAliyunNodeRegisterTemplate, 0)
nodeParam := MQTTAliyunNodeRegisterTemplate{}
status := false
setting.Logger.Debugf("nodeLoginName %v", name)
for _, d := range name {
for k, v := range r.NodeList {
if d == v.Name {
nodeParam.DeviceSecret = v.Param.DeviceSecret
nodeParam.DeviceName = v.Param.DeviceName
nodeParam.ProductKey = v.Param.ProductKey
nodeList = append(nodeList, nodeParam)
r.NodeList[k].CommStatus = "onLine"
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunNodeLoginIn(r.GWParam.MQTTClient, mqttAliyunRegister, nodeList) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
sendMessage.DeviceName = append(sendMessage.DeviceName, v.Param.DeviceName)
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
//超时3s
time.AfterFunc(5*time.Second, func() {
for i, s := range r.SendMessageMap {
if s.ID == MsgIdStr {
r.SendMessageMap = append(r.SendMessageMap[:i], r.SendMessageMap[i+1:]...)
}
}
})
status = true
}
}
}
return status
}
func (r *ReportServiceParamAliyunTemplate) NodeLogOut(name []string) bool {
nodeList := make([]MQTTAliyunNodeRegisterTemplate, 0)
nodeParam := MQTTAliyunNodeRegisterTemplate{}
for _, d := range name {
for k, v := range r.NodeList {
if d == v.Name {
if v.ReportStatus == "offLine" {
setting.Logger.Infof("service:%s,%s is already offLine", r.GWParam.ServiceName, v.Name)
} else {
nodeParam.DeviceSecret = v.Param.DeviceSecret
nodeParam.DeviceName = v.Param.DeviceName
nodeParam.ProductKey = v.Param.ProductKey
nodeList = append(nodeList, nodeParam)
r.NodeList[k].CommStatus = "offLine"
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunNodeLoginOut(r.GWParam.MQTTClient, mqttAliyunRegister, nodeList) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range nodeList {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
}
}
}
}
return true
}
func (r *ReportServiceParamAliyunTemplate) GWPropertyPost() {
valueMap := make([]MQTTAliyunValueTemplate, 0)
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = "MemTotal"
mqttAliyunValue.Value = setting.SystemState.MemTotal
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "MemUse"
mqttAliyunValue.Value = setting.SystemState.MemUse
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DiskTotal"
mqttAliyunValue.Value = setting.SystemState.DiskTotal
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DiskUse"
mqttAliyunValue.Value = setting.SystemState.DiskUse
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "Name"
mqttAliyunValue.Value = setting.SystemState.Name
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "SN"
mqttAliyunValue.Value = setting.SystemState.SN
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "HardVer"
mqttAliyunValue.Value = setting.SystemState.HardVer
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "SoftVer"
mqttAliyunValue.Value = setting.SystemState.SoftVer
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "SystemRTC"
mqttAliyunValue.Value = setting.SystemState.SystemRTC
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "RunTime"
mqttAliyunValue.Value = setting.SystemState.RunTime
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DeviceOnline"
mqttAliyunValue.Value = setting.SystemState.DeviceOnline
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunValue.Name = "DevicePacketLoss"
mqttAliyunValue.Value = setting.SystemState.DevicePacketLoss
valueMap = append(valueMap, mqttAliyunValue)
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunGWPropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, valueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
sendMessage.DeviceName = append(sendMessage.DeviceName, r.GWParam.Param.DeviceName)
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
//上报故障先加收到正确回应后清0
r.GWParam.ReportErrCnt++
setting.Logger.Debugf("service %s,gw ReportErrCnt %d", r.GWParam.Param.DeviceName, r.GWParam.ReportErrCnt)
}
func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
//上报故障计数值先加收到正确回应后清0
for i := 0; i < len(r.NodeList); i++ {
r.NodeList[i].ReportErrCnt++
}
pageCnt := len(r.NodeList) / 20 //单包最大发送20个设备
if len(r.NodeList)%20 != 0 {
pageCnt += 1
}
//log.Printf("pageCnt %v\n", pageCnt)
for pageIndex := 0; pageIndex < pageCnt; pageIndex++ {
//log.Printf("pageIndex %v\n", pageIndex)
if pageIndex != (pageCnt - 1) {
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := r.NodeList[20*pageIndex : 20*pageIndex+20]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range NodeValueMap {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
} else { //最后一页
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := r.NodeList[20*pageIndex : len(r.NodeList)]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
//setting.Logger.Debugf("NodeValueMap %v", NodeValueMap)
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range NodeValueMap {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}
//指定设备上传属性
func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(name []string) {
nodeList := make([]ReportServiceNodeParamAliyunTemplate, 0)
for _, n := range name {
for k, v := range r.NodeList {
if n == v.Name {
nodeList = append(nodeList, v)
//上报故障计数值先加收到正确回应后清0
r.NodeList[k].ReportErrCnt++
}
}
}
pageCnt := len(nodeList) / 20 //单包最大发送20个设备
if len(nodeList)%20 != 0 {
pageCnt += 1
}
//log.Printf("pageCnt %v\n", pageCnt)
for pageIndex := 0; pageIndex < pageCnt; pageIndex++ {
//log.Printf("pageIndex %v\n", pageIndex)
if pageIndex != (pageCnt - 1) {
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := nodeList[20*pageIndex : 20*pageIndex+20]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range NodeValueMap {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
} else { //最后一页
NodeValueMap := make([]MQTTAliyunNodeValueTemplate, 0)
valueMap := make([]MQTTAliyunValueTemplate, 0)
node := nodeList[20*pageIndex : len(nodeList)]
//log.Printf("nodeList %v\n", node)
for _, n := range node {
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == n.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
mqttAliyunValue := MQTTAliyunValueTemplate{}
mqttAliyunValue.Name = v.Name
mqttAliyunValue.Value = v.Value[index].Value
valueMap = append(valueMap, mqttAliyunValue)
}
}
NodeValue := MQTTAliyunNodeValueTemplate{}
NodeValue.ValueMap = valueMap
NodeValue.ProductKey = n.Param.ProductKey
NodeValue.DeviceName = n.Param.DeviceName
NodeValueMap = append(NodeValueMap, NodeValue)
}
}
}
}
}
mqttAliyunRegister := MQTTAliyunRegisterTemplate{
RemoteIP: r.GWParam.IP,
RemotePort: r.GWParam.Port,
ProductKey: r.GWParam.Param.ProductKey,
DeviceName: r.GWParam.Param.DeviceName,
DeviceSecret: r.GWParam.Param.DeviceSecret,
}
//setting.Logger.Debugf("NodeValueMap %v", NodeValueMap)
MsgId := MQTTAliyunNodePropertyPost(r.GWParam.MQTTClient, mqttAliyunRegister, NodeValueMap) - 1
MsgIdStr := strconv.Itoa(MsgId)
sendMessage := ReportServiceSendMessageAliyunTemplate{
ID: MsgIdStr,
}
for _, v := range NodeValueMap {
sendMessage.DeviceName = append(sendMessage.DeviceName, v.DeviceName)
}
r.SendMessageMap = append(r.SendMessageMap, sendMessage)
setting.Logger.Debugf("service:%s,sendMessageMapAdd %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}
func (r *ReportServiceParamAliyunTemplate) PropertyPost() {
//网关上报
nameMap := make([]string, 0)
nameMap = append(nameMap, r.GWParam.Param.DeviceName)
propertyPost := ReportServicePropertyPostAliyunTemplate{
DeviceName: nameMap,
DeviceType: 0,
}
r.PropertyPostChan <- propertyPost
//末端设备上报
nameMap = nameMap[0:0] //清空slice
for _, v := range r.NodeList {
nameMap = append(nameMap, v.Name)
}
if len(nameMap) > 0 {
propertyPost = ReportServicePropertyPostAliyunTemplate{
DeviceName: nameMap,
DeviceType: 1,
}
r.PropertyPostChan <- propertyPost
}
}
func ProcessPropertyPost(r *ReportServiceParamAliyunTemplate) {
func (r *ReportServiceParamAliyunTemplate) ProcessUpLinkFrame() {
for {
select {
case postParam := <-r.PropertyPostChan:
case reqFrame := <-r.LogInRequestFrameChan:
{
setting.Logger.Tracef("service %s,postParam %v,postChanCnt %v", r.GWParam.ServiceName, postParam, len(r.PropertyPostChan))
if postParam.DeviceType == 0 { //网关上报
r.GWPropertyPost()
} else if postParam.DeviceType == 1 { //末端设备上报
r.NodePropertyPost(postParam.DeviceName)
}
r.LogIn(reqFrame)
}
case reqFrame := <-r.LogOutRequestFrameChan:
{
r.LogOut(reqFrame)
}
case reqFrame := <-r.ReportNodePropertyRequestFrameChan:
{
r.NodePropertyPost(reqFrame)
}
}
}
}
func ReportServiceAliyunProcessGetSubDeviceProperty(r *ReportServiceParamAliyunTemplate, message MQTTAliyunMessageTemplate,
gw MQTTAliyunRegisterTemplate, cmdName string) {
func (r *ReportServiceParamAliyunTemplate) ProcessDownLinkFrame() {
addrArray := strings.Split(message.Params["Addr"].(string), ",")
for _, v := range addrArray {
for _, n := range r.NodeList {
if v == n.Param.DeviceName {
cmd := device.CommunicationCmdTemplate{}
cmd.CollInterfaceName = "coll1"
cmd.DeviceName = n.Addr
cmd.FunName = "GetRealVariables"
paramStr, _ := json.Marshal(message.Params)
cmd.FunPara = string(paramStr)
for {
select {
case frame := <-r.ReceiveFrameChan:
{
setting.Logger.Debugf("Recv TOPIC: %s\n", frame.Topic)
setting.Logger.Debugf("Recv MSG: %s\n", frame.Payload)
if strings.Contains(frame.Topic, "/thing/event/property/pack/post_reply") { //上报属性回应
ackFrame := MQTTAliyunReportPropertyAckTemplate{}
err := json.Unmarshal(frame.Payload, &ackFrame)
if err != nil {
setting.Logger.Errorf("ReportPropertyAck json unmarshal err")
return
}
r.ReceiveReportNodePropertyAckFrameChan <- ackFrame
} else if strings.Contains(frame.Topic, "/combine/batch_login_reply") { //子设备上线回应
ackFrame := MQTTAliyunLogInAckTemplate{}
err := json.Unmarshal(frame.Payload, &ackFrame)
if err != nil {
setting.Logger.Warningf("LogInAck json unmarshal err")
return
}
r.ReceiveLogInAckFrameChan <- ackFrame
} else if strings.Contains(frame.Topic, "/combine/batch_logout_reply") { //子设备下线回应
ackFrame := MQTTAliyunLogOutAckTemplate{}
err := json.Unmarshal(frame.Payload, &ackFrame)
if err != nil {
setting.Logger.Errorf("LogOutAck json unmarshal err")
return
}
r.ReceiveLogOutAckFrameChan <- ackFrame
} else if strings.Contains(frame.Topic, "/thing/service/property/set") { //设置属性请求
cmd := device.CommunicationCmdTemplate{}
cmd.CollInterfaceName = "coll1"
//cmd.DeviceAddr = property["Addr"]
cmd.FunName = "SetRemoteCmdAdjust"
//cmd.FunPara = string(bodyBuf[:n])
if len(device.CommunicationManage) > 0 {
if device.CommunicationManage[0].CommunicationManageAddEmergency(cmd) == true {
if len(device.CommunicationManage) > 0 {
if device.CommunicationManage[0].CommunicationManageAddEmergency(cmd) == true {
payload := MQTTAliyunThingServiceAckTemplate{
Identifier: cmdName,
ID: message.ID,
Code: 200,
Data: make(map[string]interface{}),
}
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, gw, payload)
} else {
payload := MQTTAliyunThingServiceAckTemplate{
Identifier: cmdName,
ID: message.ID,
Code: 1000,
Data: make(map[string]interface{}),
}
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, gw, payload)
}
}
}
@ -745,172 +292,41 @@ func ReportServiceAliyunProcessGetSubDeviceProperty(r *ReportServiceParamAliyunT
}
}
func ReportServiceAliyunProcessMessage(r *ReportServiceParamAliyunTemplate, topic string, payload []byte) {
func (r *ReportServiceParamAliyunTemplate) LogIn(nodeName []string) {
setting.Logger.Debugf("Recv TOPIC: %s\n", topic)
setting.Logger.Debugf("Recv MSG: %s\n", payload)
type ReportServiceAliyunMessageTemplate struct {
Topic string
Payload []byte
//清空接收chan避免出现有上次接收的缓存
for i := 0; i < len(r.ReceiveLogInAckFrameChan); i++ {
<-r.ReceiveLogInAckFrameChan
}
message := ReportServiceAliyunMessageTemplate{
Topic: topic,
Payload: payload,
r.NodeLogin(nodeName)
}
func (r *ReportServiceParamAliyunTemplate) LogOut(nodeName []string) {
//清空接收chan避免出现有上次接收的缓存
for i := 0; i < len(r.ReceiveLogOutAckFrameChan); i++ {
<-r.ReceiveLogOutAckFrameChan
}
if strings.Contains(topic, "/thing/event/property/pack/post_reply") { //上报属性回应
type MQTTAliyunPropertyPostAckTemplate struct {
Code int32 `json:"code"`
Data string `json:"-"`
ID string `json:"id"`
Message string `json:"message"`
Method string `json:"method"`
Version string `json:"version"`
}
property := MQTTAliyunPropertyPostAckTemplate{}
err := json.Unmarshal(payload, &property)
if err != nil {
setting.Logger.Errorf("PropertyPostAck json unmarshal err")
return
}
setting.Logger.Debugf("code %v\n", property.Code)
if property.Code == 200 {
ackMessage := ReportServiceReceiveMessageAliyunTemplate{
Topic: message.Topic,
Code: property.Code,
ID: property.ID,
}
lock.Lock()
for k, v := range r.SendMessageMap {
if v.ID == ackMessage.ID {
for _, name := range v.DeviceName {
if name == r.GWParam.Param.DeviceName { //网关设备
r.GWParam.ReportStatus = "onLine"
r.GWParam.ReportErrCnt = 0
setting.Logger.Infof("service:%s,gw online", r.GWParam.ServiceName)
} else { //末端设备
for i, n := range r.NodeList {
if name == n.Param.DeviceName {
r.NodeList[i].ReportStatus = "onLine"
r.NodeList[i].ReportErrCnt = 0
setting.Logger.Infof("service:%s,%s online", r.GWParam.ServiceName, n.Param.DeviceName)
}
}
}
}
setting.Logger.Debugf("service:%s,sendMessageMapPre %v", r.GWParam.ServiceName, r.SendMessageMap)
setting.Logger.Debugf("k:%v,id:%v", k, v.ID)
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
lock.Unlock()
}
} else if strings.Contains(topic, "/combine/batch_login_reply") { //子设备上线回应
type MQTTAliyunLogInDataTemplate struct {
ProductKey string `json:"productKey"`
DeviceName string `json:"deviceName"`
}
r.NodeLogOut(nodeName)
}
type MQTTAliyunLogInAckTemplate struct {
ID string `json:"id"`
Code int32 `json:"code"`
Message string `json:"message"`
Data []MQTTAliyunLogInDataTemplate `json:"data"`
}
func (r *ReportServiceParamAliyunTemplate) ReportTimeOut() {
property := MQTTAliyunLogInAckTemplate{}
err := json.Unmarshal(payload, &property)
if err != nil {
setting.Logger.Warningf("LogInAck json unmarshal err")
return
}
setting.Logger.Infof("code %v\n", property.Code)
if property.Code == 200 {
ackMessage := ReportServiceReceiveMessageAliyunTemplate{
Topic: message.Topic,
Code: property.Code,
ID: property.ID,
}
for k, v := range r.SendMessageMap {
if v.ID == ackMessage.ID {
for _, name := range v.DeviceName {
for i, n := range r.NodeList {
if name == n.Param.DeviceName {
r.NodeList[i].ReportStatus = "onLine"
r.NodeList[i].ReportErrCnt = 0
//网关上报
r.ReportGWPropertyRequestFrameChan <- true
}
}
}
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}
} else if strings.Contains(topic, "/combine/batch_logout_reply") { //子设备下线回应
type MQTTAliyunLogOutDataTemplate struct {
Code int32 `json:"code"`
Message string `json:"message"`
ProductKey string `json:"productKey"`
DeviceName string `json:"deviceName"`
}
type MQTTAliyunLogOutAckTemplate struct {
ID string `json:"id"`
Code int32 `json:"code"`
Message string `json:"message"`
Data MQTTAliyunLogOutDataTemplate `json:"data"`
}
property := MQTTAliyunLogOutAckTemplate{}
err := json.Unmarshal(payload, &property)
if err != nil {
setting.Logger.Errorf("LogOutAck json unmarshal err")
return
}
setting.Logger.Debugf("code %v\n", property.Code)
if property.Code == 200 {
ackMessage := ReportServiceReceiveMessageAliyunTemplate{
Topic: message.Topic,
Code: property.Code,
ID: property.ID,
}
for k, v := range r.SendMessageMap {
if v.ID == ackMessage.ID {
for _, name := range v.DeviceName {
for i, n := range r.NodeList {
if name == n.Param.DeviceName {
r.NodeList[i].ReportStatus = "onLine"
r.NodeList[i].ReportErrCnt = 0
}
}
}
r.SendMessageMap = append(r.SendMessageMap[:k], r.SendMessageMap[k+1:]...)
setting.Logger.Debugf("service:%s,sendMessageMapNow %v", r.GWParam.ServiceName, r.SendMessageMap)
}
}
}
} else if strings.Contains(topic, "/thing/service/property/set") { //设置属性请求
cmd := device.CommunicationCmdTemplate{}
cmd.CollInterfaceName = "coll1"
//cmd.DeviceAddr = property["Addr"]
cmd.FunName = "SetRemoteCmdAdjust"
//cmd.FunPara = string(bodyBuf[:n])
if len(device.CommunicationManage) > 0 {
if device.CommunicationManage[0].CommunicationManageAddEmergency(cmd) == true {
}
}
//全部末端设备上报
nodeName := make([]string, 0)
for _, v := range r.NodeList {
nodeName = append(nodeName, v.Name)
}
r.ReportNodePropertyRequestFrameChan <- nodeName
}
//查看上报服务中设备是否离线
func (r *ReportServiceParamAliyunTemplate) CheckReportServiceOffline() {
func (r *ReportServiceParamAliyunTemplate) ReportOfflineTime() {
setting.Logger.Infof("service:%s,CheckReportOffline", r.GWParam.ServiceName)
if r.GWParam.ReportErrCnt >= 3 {
@ -940,13 +356,16 @@ func ReportServiceAliyunPoll(r *ReportServiceParamAliyunTemplate) {
reportOfflineTime := fmt.Sprintf("@every %dm%ds", (3*r.GWParam.ReportTime)/60, (3*r.GWParam.ReportTime)%60)
setting.Logger.Infof("reportServiceAliyun reportOfflineTime%v", reportOfflineTime)
_ = cronProcess.AddFunc(reportOfflineTime, r.CheckReportServiceOffline)
_ = cronProcess.AddFunc(reportOfflineTime, r.ReportOfflineTime)
cronProcess.Start()
defer cronProcess.Stop()
name := make([]string, 0)
go r.ProcessUpLinkFrame()
go r.ProcessDownLinkFrame()
name := make([]string, 0)
for {
switch reportState {
case 0:
@ -954,9 +373,7 @@ func ReportServiceAliyunPoll(r *ReportServiceParamAliyunTemplate) {
if r.GWLogin() == true {
reportState = 1
_ = cronProcess.AddFunc(reportTime, r.PropertyPost)
//_ = cronProcess.AddFunc(reportTime, r.GWPropertyPost)
//_ = cronProcess.AddFunc(reportTime, r.AllNodePropertyPost)
_ = cronProcess.AddFunc(reportTime, r.ReportTimeOut)
} else {
time.Sleep(5 * time.Second)
}
@ -976,7 +393,7 @@ func ReportServiceAliyunPoll(r *ReportServiceParamAliyunTemplate) {
}
if len(name) > 0 {
setting.Logger.Infof("DeviceOnline %v\n", name)
r.NodeLogin(name)
r.LogInRequestFrameChan <- name
name = name[0:0]
}
@ -988,7 +405,7 @@ func ReportServiceAliyunPoll(r *ReportServiceParamAliyunTemplate) {
}
if len(name) > 0 {
setting.Logger.Infof("DeviceOffline %v\n", name)
r.NodeLogOut(name)
r.LogOutRequestFrameChan <- name
name = name[0:0]
}
@ -1000,7 +417,11 @@ func ReportServiceAliyunPoll(r *ReportServiceParamAliyunTemplate) {
if v.Name == nodeName {
if v.ReportStatus == "offLine" { //当设备上报状态是离线时立马发送设备上线
name = append(name, nodeName)
go r.NodeLogin(name)
r.LogInRequestFrameChan <- name
name = name[0:0]
} else {
name = append(name, nodeName)
r.LogInRequestFrameChan <- name
name = name[0:0]
}
}