增加:1、增加了Emqx上报服务 2、routerReport增加Emqx相关接口

修改:1、commMangage修改了返回通信返回结果,既返回了状态也返回了数据
This commit is contained in:
pengwang 2021-08-03 20:11:44 +08:00
parent 00bc48c104
commit b93113a381
17 changed files with 1737 additions and 33 deletions

View File

@ -218,6 +218,7 @@ func NewCollectInterface(collInterfaceName, commInterfaceName string,
}
}
setting.Logger.Debugf("CommunicationInterfaceMap %v index %v", CommunicationInterfaceMap, index)
nodeManage := &CollectInterfaceTemplate{
CollInterfaceName: collInterfaceName,
CommInterfaceName: commInterfaceName,

View File

@ -14,10 +14,15 @@ type CommunicationCmdTemplate struct {
FunPara string
}
type CommunicationRxTemplate struct {
Status bool
RxBuf []byte
}
type CommunicationManageTemplate struct {
EmergencyRequestChan chan CommunicationCmdTemplate
CommonRequestChan chan CommunicationCmdTemplate
EmergencyAckChan chan bool
EmergencyAckChan chan CommunicationRxTemplate
CollInterface *CollectInterfaceTemplate
PacketChan chan []byte
}
@ -29,7 +34,7 @@ func NewCommunicationManageTemplate(coll *CollectInterfaceTemplate) *Communicati
template := &CommunicationManageTemplate{
EmergencyRequestChan: make(chan CommunicationCmdTemplate, 1),
CommonRequestChan: make(chan CommunicationCmdTemplate, 100),
EmergencyAckChan: make(chan bool, 1),
EmergencyAckChan: make(chan CommunicationRxTemplate, 1),
PacketChan: make(chan []byte, 100), //最多连续接收100帧数据
CollInterface: coll,
}
@ -44,7 +49,7 @@ func (c *CommunicationManageTemplate) CommunicationManageAddCommon(cmd Communica
c.CommonRequestChan <- cmd
}
func (c *CommunicationManageTemplate) CommunicationManageAddEmergency(cmd CommunicationCmdTemplate) bool {
func (c *CommunicationManageTemplate) CommunicationManageAddEmergency(cmd CommunicationCmdTemplate) CommunicationRxTemplate {
c.EmergencyRequestChan <- cmd
@ -77,9 +82,11 @@ func (c *CommunicationManageTemplate) AnalysisRx() {
}
}
func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd CommunicationCmdTemplate) bool {
func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd CommunicationCmdTemplate) CommunicationRxTemplate {
status := false
rxData := CommunicationRxTemplate{
Status: false,
}
startT := time.Now() //计算当前时间
for _, v := range c.CollInterface.DeviceNodeMap {
@ -174,16 +181,19 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
}
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
status = false
goto LoopCommonStep
}
//是否正确收到数据包
case <-v.AnalysisRx(v.Addr, v.VariableMap, rxTotalBuf, rxTotalBufCnt):
case rxStatus := <-v.AnalysisRx(v.Addr, v.VariableMap, rxTotalBuf, rxTotalBufCnt):
{
timerOut.Stop()
setting.Logger.Debugf("%v:rx ok", c.CollInterface.CollInterfaceName)
setting.Logger.Debugf("%v:rxbuf %X", c.CollInterface.CollInterfaceName, rxTotalBuf)
rxData.Status = rxStatus
rxData.RxBuf = rxTotalBuf
CommunicationMessage := CommunicationMessageTemplate{
CollName: c.CollInterface.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05"),
@ -219,7 +229,6 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
status = true
goto LoopCommonStep
}
//继续接收数据
@ -262,7 +271,7 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
}
}
return status
return rxData
}
func (c *CommunicationManageTemplate) CommunicationManageDel() {
@ -272,14 +281,13 @@ func (c *CommunicationManageTemplate) CommunicationManageDel() {
case cmd := <-c.EmergencyRequestChan:
{
setting.Logger.Infof("emergency chan collName %v nodeName %v funName %v", c.CollInterface.CollInterfaceName, cmd.DeviceName, cmd.FunName)
status := false
status = c.CommunicationStateMachine(cmd)
rxData := c.CommunicationStateMachine(cmd)
GetDeviceOnline()
GetDevicePacketLoss()
setting.Logger.Debugf("emergency chan status %v", status)
c.EmergencyAckChan <- status
setting.Logger.Debugf("emergency chan rxData %v", rxData)
c.EmergencyAckChan <- rxData
}
default:
{

View File

@ -672,7 +672,8 @@ func apiGetNodeReadVariable(context *gin.Context) {
cmd.DeviceName = c.DeviceNodeMap[nodeIndex].Name
cmd.FunName = "GetRealVariables"
cmd.FunPara = ""
if n.CommunicationManageAddEmergency(cmd) == true {
cmdRX := n.CommunicationManageAddEmergency(cmd)
if cmdRX.Status == true {
aParam.Code = "0"
aParam.Message = ""
aParam.Data = make([]VariableTemplate, 0)
@ -897,6 +898,7 @@ func apiAddCommInterface(context *gin.Context) {
}
device.CommunicationSerialMap = append(device.CommunicationSerialMap, SerialInterface)
device.WriteCommSerialInterfaceListToJson()
device.CommunicationInterfaceMap = append(device.CommunicationInterfaceMap, SerialInterface)
case "TcpClient":
TcpClient := device.TcpClientInterfaceParam{}
err = json.Unmarshal(Param, &TcpClient)
@ -913,6 +915,7 @@ func apiAddCommInterface(context *gin.Context) {
device.CommunicationTcpClientMap = append(device.CommunicationTcpClientMap, TcpClientInterface)
device.WriteCommTcpClientInterfaceListToJson()
device.CommunicationInterfaceMap = append(device.CommunicationInterfaceMap, TcpClientInterface)
case "IoOut":
IoOut := device.IoOutInterfaceParam{}
err = json.Unmarshal(Param, &IoOut)
@ -928,6 +931,7 @@ func apiAddCommInterface(context *gin.Context) {
}
device.CommunicationIoOutMap = append(device.CommunicationIoOutMap, IoOutInterface)
device.WriteCommIoOutInterfaceListToJson()
device.CommunicationInterfaceMap = append(device.CommunicationInterfaceMap, IoOutInterface)
case "IoIn":
IoIn := device.IoInInterfaceParam{}
err = json.Unmarshal(Param, &IoIn)
@ -943,6 +947,7 @@ func apiAddCommInterface(context *gin.Context) {
}
device.CommunicationIoInMap = append(device.CommunicationIoInMap, IoInInterface)
device.WriteCommIoInInterfaceListToJson()
device.CommunicationInterfaceMap = append(device.CommunicationInterfaceMap, IoInInterface)
}
aParam.Code = "0"
@ -1476,7 +1481,8 @@ func apiInvokeService(context *gin.Context) {
cmd.FunName = serviceInfo.ServiceName
paramStr, _ := json.Marshal(serviceInfo.ServiceParam)
cmd.FunPara = string(paramStr)
if n.CommunicationManageAddEmergency(cmd) == true {
cmdRX := n.CommunicationManageAddEmergency(cmd)
if cmdRX.Status == true {
aParam.Code = "0"
aParam.Message = ""
sJson, _ := json.Marshal(aParam)

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"goAdapter/report/mqttAliyun"
mqttEmqx "goAdapter/report/mqttEMQX"
"goAdapter/report/mqttHuawei"
"goAdapter/setting"
"net/http"
@ -63,6 +64,11 @@ func apiSetReportGWParam(context *gin.Context) {
}
mqttAliyun.ReportServiceParamListAliyun.AddReportService(ReportServiceGWParamAliyun)
case "EMQX.MQTT":
ReportServiceGWParamEmqx := mqttEmqx.ReportServiceGWParamEmqxTemplate{}
if err := json.Unmarshal(bodyBuf[:n], &ReportServiceGWParamEmqx); err != nil {
fmt.Println("ReportServiceGWParamEmqx json unMarshall err,", err)
}
mqttEmqx.ReportServiceParamListEmqx.AddReportService(ReportServiceGWParamEmqx)
case "Huawei.MQTT":
ReportServiceGWParamHuawei := mqttHuawei.ReportServiceGWParamHuaweiTemplate{}
if err := json.Unmarshal(bodyBuf[:n], &ReportServiceGWParamHuawei); err != nil {
@ -125,6 +131,20 @@ func apiGetReportGWParam(context *gin.Context) {
aParam.Data = append(aParam.Data, ReportService)
}
for _, v := range mqttEmqx.ReportServiceParamListEmqx.ServiceList {
ReportService := ReportServiceTemplate{}
ReportService.ServiceName = v.GWParam.ServiceName
ReportService.IP = v.GWParam.IP
ReportService.Port = v.GWParam.Port
ReportService.ReportTime = v.GWParam.ReportTime
ReportService.Protocol = v.GWParam.Protocol
ReportService.Param = v.GWParam.Param
ReportService.CommStatus = v.GWParam.ReportStatus
aParam.Data = append(aParam.Data, ReportService)
}
for _, v := range mqttHuawei.ReportServiceParamListHuawei.ServiceList {
ReportService := ReportServiceTemplate{}
@ -191,6 +211,21 @@ func apiDeleteReportGWParam(context *gin.Context) {
}
}
//查看Emqx
for _, v := range mqttEmqx.ReportServiceParamListEmqx.ServiceList {
if v.GWParam.ServiceName == param.ServiceName {
mqttEmqx.ReportServiceParamListEmqx.DeleteReportService(param.ServiceName)
aParam.Code = "0"
aParam.Message = ""
aParam.Data = ""
sJson, _ := json.Marshal(aParam)
context.String(http.StatusOK, string(sJson))
return
}
}
for _, v := range mqttHuawei.ReportServiceParamListHuawei.ServiceList {
if v.GWParam.ServiceName == param.ServiceName {
mqttHuawei.ReportServiceParamListHuawei.DeleteReportService(param.ServiceName)
@ -268,9 +303,18 @@ func apiSetReportNodeWParam(context *gin.Context) {
v.AddReportNode(ReportServiceNodeParamAliyun)
}
}
setting.Logger.Debugf("ParamListAliyun %v\n", mqttAliyun.ReportServiceParamListAliyun.ServiceList)
setting.Logger.Debugf("ParamListAliyun %v", mqttAliyun.ReportServiceParamListAliyun.ServiceList)
case "EMQX.MQTT":
ReportServiceNodeParamEmqx := mqttEmqx.ReportServiceNodeParamEmqxTemplate{}
if err := json.Unmarshal(bodyBuf[:n], &ReportServiceNodeParamEmqx); err != nil {
setting.Logger.Errorf("ReportServiceNodeParamEmqx json unMarshall err,", err)
}
for _, v := range mqttEmqx.ReportServiceParamListEmqx.ServiceList {
if v.GWParam.ServiceName == param.ServiceName {
v.AddReportNode(ReportServiceNodeParamEmqx)
}
}
setting.Logger.Debugf("ParamListAliyun %v", mqttAliyun.ReportServiceParamListAliyun.ServiceList)
case "Huawei.MQTT":
ReportServiceNodeParamHuawei := mqttHuawei.ReportServiceNodeParamHuaweiTemplate{}
if err := json.Unmarshal(bodyBuf[:n], &ReportServiceNodeParamHuawei); err != nil {
@ -415,6 +459,28 @@ func apiGetReportNodeWParam(context *gin.Context) {
}
}
for _, v := range mqttEmqx.ReportServiceParamListEmqx.ServiceList {
if v.GWParam.ServiceName == ServiceName {
ReportServiceNode := ReportServiceNodeTemplate{}
for _, d := range v.NodeList {
ReportServiceNode.ServiceName = d.ServiceName
ReportServiceNode.CollInterfaceName = d.CollInterfaceName
ReportServiceNode.Name = d.Name
ReportServiceNode.Addr = d.Addr
ReportServiceNode.Protocol = d.Protocol
ReportServiceNode.CommStatus = d.CommStatus
ReportServiceNode.ReportStatus = d.ReportStatus
ReportServiceNode.Param = d.Param
aParam.Data = append(aParam.Data, ReportServiceNode)
}
aParam.Code = "0"
aParam.Message = ""
sJson, _ := json.Marshal(aParam)
context.String(http.StatusOK, string(sJson))
return
}
}
for _, v := range mqttHuawei.ReportServiceParamListHuawei.ServiceList {
if v.GWParam.ServiceName == ServiceName {
ReportServiceNode := ReportServiceNodeTemplate{}
@ -497,6 +563,26 @@ func apiDeleteReportNodeWParam(context *gin.Context) {
}
}
//查看Emqx
for _, v := range mqttEmqx.ReportServiceParamListEmqx.ServiceList {
for _, n := range v.NodeList {
if (n.ServiceName == param.ServiceName) &&
(n.CollInterfaceName == param.CollInterfaceName) &&
(n.Addr == param.Addr) {
v.DeleteReportNode(param.Addr)
aParam.Code = "0"
aParam.Message = ""
aParam.Data = ""
sJson, _ := json.Marshal(aParam)
context.String(http.StatusOK, string(sJson))
return
}
}
}
for _, v := range mqttHuawei.ReportServiceParamListHuawei.ServiceList {
for _, n := range v.NodeList {
if (n.ServiceName == param.ServiceName) &&

View File

@ -34,16 +34,15 @@ func (r *ReportServiceParamAliyunTemplate) ProcessInvokeThingsService() {
cmd.FunPara = string(paramStr)
ack := MQTTAliyunInvokeThingsServiceAckTemplate{}
if len(device.CommunicationManage) > 0 {
if device.CommunicationManage[0].CommunicationManageAddEmergency(cmd) == true {
ack.ID = reqFrame.ID
ack.Code = 200
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, r.GWParam, ack, cmd.FunName)
} else {
ack.ID = reqFrame.ID
ack.Code = 1000
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, r.GWParam, ack, cmd.FunName)
}
ackData := v.CommunicationManageAddEmergency(cmd)
if ackData.Status {
ack.ID = reqFrame.ID
ack.Code = 200
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, r.GWParam, ack, cmd.FunName)
} else {
ack.ID = reqFrame.ID
ack.Code = 1000
MQTTAliyunThingServiceAck(r.GWParam.MQTTClient, r.GWParam, ack, cmd.FunName)
}
}
}

View File

@ -0,0 +1,160 @@
package mqttEmqx
import (
"encoding/json"
"goAdapter/setting"
"strconv"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type MQTTNodeLoginParamTemplate struct {
ClientID string `json:"clientID"`
Timestamp int64 `json:"timestamp"`
}
type MQTTNodeLoginTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Params []MQTTNodeLoginParamTemplate `json:"params"`
}
var MsgID int = 0
func MQTTEmqxGWLogin(param ReportServiceGWParamEmqxTemplate, publishHandler MQTT.MessageHandler) (bool, MQTT.Client) {
opts := MQTT.NewClientOptions().AddBroker(param.IP + ":" + param.Port)
opts.SetClientID(param.Param.ClientID)
opts.SetUsername(param.Param.Username)
//hs256 := sha256.New()
//hs256.Write([]byte("zhsHrx123456@"))
//password := hs256.Sum(nil)
//param.Password = string(hex.EncodeToString(password))
//setting.Logger.Debugf("Emqx password %v", param.Password)
//param.Password = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InpocyIsImNsaWVudGlkIjoiIiwiaWF0IjoxNjI2NzAwMzE1fQ.EICw6uVoP-_X2iKcdkmBTJevFspm7Nz9ipHjJpr8eHg"
opts.SetPassword(param.Param.Password)
opts.SetKeepAlive(60 * 2 * time.Second)
opts.SetDefaultPublishHandler(publishHandler)
opts.SetAutoReconnect(false)
// create and start a client using the above ClientOptions
mqttClient := MQTT.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
setting.Logger.Errorf("Connect Emqx IoT Cloud fail %s", token.Error())
return false, nil
}
setting.Logger.Info("Connect Emqx IoT Cloud Sucess")
subTopic := ""
//子设备上线回应
subTopic = "/sys/thing/event/login/post_reply/" + param.Param.ClientID
MQTTEmqxSubscribeTopic(mqttClient, subTopic)
//子设备下线回应
subTopic = "/sys/thing/event/logout/post_reply/" + param.Param.ClientID
MQTTEmqxSubscribeTopic(mqttClient, subTopic)
//属性设置上报回应
subTopic = "/sys/thing/event/property/post_reply/" + param.Param.ClientID
MQTTEmqxSubscribeTopic(mqttClient, subTopic)
//订阅属性下发请求
subTopic = "/sys/thing/event/property/set/" + param.Param.ClientID
MQTTEmqxSubscribeTopic(mqttClient, subTopic)
//订阅属性查询请求
subTopic = "/sys/thing/event/property/get/" + param.Param.ClientID
MQTTEmqxSubscribeTopic(mqttClient, subTopic)
//订阅服务调用请求
subTopic = "/sys/thing/event/service/invoke/" + param.Param.ClientID
MQTTEmqxSubscribeTopic(mqttClient, subTopic)
return true, mqttClient
}
func MQTTEmqxSubscribeTopic(client MQTT.Client, topic string) {
if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
setting.Logger.Warningf("Subscribe topic %s fail %v", topic, token.Error())
}
setting.Logger.Info("Subscribe topic " + topic + " success")
}
func (r *ReportServiceParamEmqxTemplate) GWLogin() bool {
status := false
status, r.GWParam.MQTTClient = MQTTEmqxGWLogin(r.GWParam, ReceiveMessageHandler)
if status == true {
r.GWParam.ReportStatus = "onLine"
}
return status
}
func MQTTEmqxNodeLoginIn(param ReportServiceGWParamEmqxTemplate, nodeMap []string) int {
nodeLogin := MQTTNodeLoginTemplate{
ID: strconv.Itoa(MsgID),
Version: "V1.0",
}
MsgID++
for _, v := range nodeMap {
nodeLoginParam := MQTTNodeLoginParamTemplate{
ClientID: v,
Timestamp: time.Now().Unix(),
}
nodeLogin.Params = append(nodeLogin.Params, nodeLoginParam)
}
//批量注册
loginInTopic := "/sys/thing/event/login/post/" + param.Param.ClientID
sJson, _ := json.Marshal(nodeLogin)
if len(nodeLogin.Params) > 0 {
setting.Logger.Debugf("node publish logInMsg: %s", sJson)
setting.Logger.Infof("node publish topic: %s", loginInTopic)
if param.MQTTClient != nil {
token := param.MQTTClient.Publish(loginInTopic, 0, false, sJson)
token.Wait()
}
}
return MsgID
}
func (r *ReportServiceParamEmqxTemplate) NodeLogIn(name []string) bool {
nodeMap := make([]string, 0)
status := false
setting.Logger.Debugf("nodeLoginName %v", name)
for _, d := range name {
for _, v := range r.NodeList {
if d == v.Name {
nodeMap = append(nodeMap, v.Name)
MQTTEmqxNodeLoginIn(r.GWParam, nodeMap)
select {
case frame := <-r.ReceiveLogInAckFrameChan:
{
if frame.Code == 200 {
status = true
}
}
case <-time.After(time.Millisecond * 2000):
{
status = false
}
}
}
}
}
return status
}

View File

@ -0,0 +1,84 @@
package mqttEmqx
import (
"encoding/json"
"goAdapter/setting"
"strconv"
"time"
)
type MQTTNodeLogoutParamTemplate struct {
ClientID string `json:"clientID"`
Timestamp int64 `json:"timestamp"`
}
type MQTTNodeLogoutTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Params []MQTTNodeLogoutParamTemplate `json:"params"`
}
func MQTTEmqxNodeLogOut(param ReportServiceGWParamEmqxTemplate, nodeMap []string) int {
nodeLogout := MQTTNodeLogoutTemplate{
ID: strconv.Itoa(MsgID),
Version: "V1.0",
}
MsgID++
for _, v := range nodeMap {
nodeLogoutParam := MQTTNodeLogoutParamTemplate{
ClientID: v,
Timestamp: time.Now().Unix(),
}
nodeLogout.Params = append(nodeLogout.Params, nodeLogoutParam)
}
//批量注册
LogoutInTopic := "/sys/thing/event/Logout/post/" + param.Param.ClientID
sJson, _ := json.Marshal(nodeLogout)
if len(nodeLogout.Params) > 0 {
setting.Logger.Debugf("node publish LogoutMsg: %s", sJson)
setting.Logger.Infof("node publish topic: %s", LogoutInTopic)
if param.MQTTClient != nil {
token := param.MQTTClient.Publish(LogoutInTopic, 0, false, sJson)
token.Wait()
}
}
return MsgID
}
func (r *ReportServiceParamEmqxTemplate) NodeLogOut(name []string) bool {
nodeMap := make([]string, 0)
status := false
setting.Logger.Debugf("nodeLogoutName %v", name)
for _, d := range name {
for _, v := range r.NodeList {
if d == v.Name {
nodeMap = append(nodeMap, v.Name)
MQTTEmqxNodeLogOut(r.GWParam, nodeMap)
select {
case frame := <-r.ReceiveLogOutAckFrameChan:
{
if frame.Code == 200 {
status = true
}
}
case <-time.After(time.Millisecond * 2000):
{
status = false
}
}
}
}
}
return status
}

View File

@ -0,0 +1,145 @@
package mqttEmqx
import (
"encoding/json"
"fmt"
"goAdapter/device"
"goAdapter/setting"
"time"
)
type MQTTEmqxReadPropertyRequestParamPropertyTemplate struct {
Name string `json:"name"`
}
type MQTTEmqxReadPropertyRequestParamTemplate struct {
ClientID string `json:"clientID"`
Properties []MQTTEmqxReadPropertyRequestParamPropertyTemplate `json:"properties"`
}
type MQTTEmqxReadPropertyRequestTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Ack int `json:"ack"`
Params []MQTTEmqxReadPropertyRequestParamTemplate `json:"params"`
}
type MQTTEmqxReadPropertyAckParamPropertyTemplate struct {
Name string `json:"name"`
Value interface{} `json:"value"`
Timestamp int64 `json:"timestamp"`
}
type MQTTEmqxReadPropertyAckParamTemplate struct {
ClientID string `json:"clientID"`
Properties []MQTTEmqxReadPropertyAckParamPropertyTemplate `json:"properties"`
}
type MQTTEmqxReadPropertyAckTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Code int `json:"code"`
Params []MQTTEmqxReadPropertyAckParamTemplate `json:"params"`
}
func (r *ReportServiceParamEmqxTemplate) ReportServiceEmqxReadPropertyAck(reqFrame MQTTEmqxReadPropertyRequestTemplate, code int, ackParams []MQTTEmqxReadPropertyAckParamTemplate) {
ackFrame := MQTTEmqxReadPropertyAckTemplate{
ID: reqFrame.ID,
Version: reqFrame.Version,
Code: code,
Params: ackParams,
}
sJson, _ := json.Marshal(ackFrame)
propertyPostTopic := "/sys/thing/event/property/get_reply/" + r.GWParam.Param.ClientID
setting.Logger.Infof("property get_reply topic: %s", propertyPostTopic)
setting.Logger.Debugf("property get_reply: %v", string(sJson))
if r.GWParam.MQTTClient != nil {
token := r.GWParam.MQTTClient.Publish(propertyPostTopic, 0, false, sJson)
token.Wait()
}
}
func (r *ReportServiceParamEmqxTemplate) ReportServiceEmqxProcessReadProperty(reqFrame MQTTEmqxReadPropertyRequestTemplate) {
ReadStatus := false
ackParams := make([]MQTTEmqxReadPropertyAckParamTemplate, 0)
for _, v := range reqFrame.Params {
for _, node := range r.NodeList {
if v.ClientID == node.Param.ClientID {
//从上报节点中找到相应节点
for _, coll := range device.CollectInterfaceMap {
if coll.CollInterfaceName == node.CollInterfaceName {
for _, n := range coll.DeviceNodeMap {
if n.Name == node.Name {
//从采集服务中找到相应节点
cmd := device.CommunicationCmdTemplate{}
cmd.CollInterfaceName = node.CollInterfaceName
cmd.DeviceName = node.Name
cmd.FunName = "GetRealVariables"
nameMap := make([]string, 0)
for _, pro := range v.Properties {
nameMap = append(nameMap, pro.Name)
}
paramStr, _ := json.Marshal(nameMap)
cmd.FunPara = string(paramStr)
ackParam := MQTTEmqxReadPropertyAckParamTemplate{
ClientID: node.Param.ClientID,
}
property := MQTTEmqxReadPropertyAckParamPropertyTemplate{}
timeStamp := time.Now().Unix()
//从采集队列中找到
for _, comm := range device.CommunicationManage {
if comm.CollInterface == coll {
ackData := comm.CommunicationManageAddEmergency(cmd)
if ackData.Status {
ReadStatus = true
for _, p := range v.Properties {
for _, variable := range n.VariableMap {
if p.Name == variable.Name {
if len(variable.Value) >= 1 {
index := len(variable.Value) - 1
property.Name = variable.Name
property.Timestamp = timeStamp
switch t := variable.Value[index].Value.(type) {
case uint8, uint16, int16, uint32, uint64:
property.Value = fmt.Sprintf("%d", variable.Value[index].Value)
case string:
property.Value = variable.Value[index].Value.(string)
default:
setting.Logger.Debugf("valueType %T", t)
}
ackParam.Properties = append(ackParam.Properties, property)
}
}
}
}
} else {
ReadStatus = false
for _, p := range v.Properties {
property.Name = p.Name
property.Value = 1
ackParam.Properties = append(ackParam.Properties, property)
}
}
}
}
ackParams = append(ackParams, ackParam)
}
}
}
}
}
}
}
if ReadStatus == true {
r.ReportServiceEmqxReadPropertyAck(reqFrame, 0, ackParams)
} else {
r.ReportServiceEmqxReadPropertyAck(reqFrame, 1, ackParams)
}
}

View File

@ -0,0 +1,63 @@
package mqttEmqx
import (
"goAdapter/setting"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type MQTTEmqxReceiveFrameTemplate struct {
Topic string
Payload []byte
}
type MQTTEmqxLogInDataTemplate struct {
ProductKey string `json:"productKey"`
DeviceName string `json:"deviceName"`
}
type MQTTEmqxLogInAckTemplate struct {
ID string `json:"id"`
Code int32 `json:"code"`
Message string `json:"message"`
Data []MQTTEmqxLogInDataTemplate `json:"data"`
}
type MQTTEmqxLogOutDataTemplate struct {
Code int32 `json:"code"`
Message string `json:"message"`
ProductKey string `json:"productKey"`
DeviceName string `json:"deviceName"`
}
type MQTTEmqxLogOutAckTemplate struct {
ID string `json:"id"`
Code int32 `json:"code"`
Message string `json:"message"`
Data []MQTTEmqxLogOutDataTemplate `json:"data"`
}
type MQTTEmqxReportPropertyAckTemplate struct {
Code int32 `json:"code"`
Data string `json:"-"`
ID string `json:"id"`
Message string `json:"message"`
Method string `json:"method"`
Version string `json:"version"`
}
//发送数据回调函数
func ReceiveMessageHandler(client MQTT.Client, msg MQTT.Message) {
for k, v := range ReportServiceParamListEmqx.ServiceList {
if v.GWParam.MQTTClient == client {
receiveFrame := MQTTEmqxReceiveFrameTemplate{
Topic: msg.Topic(),
Payload: msg.Payload(),
}
setting.Logger.Debugf("Recv TOPIC: %s", receiveFrame.Topic)
setting.Logger.Debugf("Recv MSG: %v", receiveFrame.Payload)
ReportServiceParamListEmqx.ServiceList[k].ReceiveFrameChan <- receiveFrame
}
}
}

View File

@ -0,0 +1,353 @@
package mqttEmqx
import (
"encoding/json"
"goAdapter/device"
"goAdapter/setting"
"strconv"
"time"
)
type MQTTEmqxReportPropertyTemplate struct {
DeviceType string //设备类型,"gw" "node"
DeviceName []string
}
type MQTTEmqxPropertyPostParamPropertyTemplate struct {
Name string `json:"name"`
Value interface{} `json:"value"`
TimeStamp int64 `json:"timestamp"`
}
type MQTTEmqxPropertyPostParamTemplate struct {
ClientID string `json:"clientID"`
Properties []MQTTEmqxPropertyPostParamPropertyTemplate `json:"properties"`
}
type MQTTEmqxPropertyPostTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Ack int `json:"ack"`
Params []MQTTEmqxPropertyPostParamTemplate `json:"params"`
}
func MQTTEmqxPropertyPost(gwParam ReportServiceGWParamEmqxTemplate, propertyParam []MQTTEmqxPropertyPostParamTemplate) int {
propertyPost := MQTTEmqxPropertyPostTemplate{
ID: strconv.Itoa(MsgID),
Version: "V1.0",
Ack: 1,
Params: propertyParam,
}
MsgID++
sJson, _ := json.Marshal(propertyPost)
propertyPostTopic := "/sys/thing/event/property/post/" + gwParam.Param.ClientID
setting.Logger.Infof("property post topic: %s", propertyPostTopic)
setting.Logger.Debugf("property post msg: %v", string(sJson))
if gwParam.MQTTClient != nil {
token := gwParam.MQTTClient.Publish(propertyPostTopic, 0, false, sJson)
token.Wait()
}
return MsgID
}
func (r *ReportServiceParamEmqxTemplate) GWPropertyPost() {
propertyMap := make([]MQTTEmqxPropertyPostParamPropertyTemplate, 0)
property := MQTTEmqxPropertyPostParamPropertyTemplate{}
timeStamp := time.Now().Unix()
property.Name = "MemTotal"
property.Value = setting.SystemState.MemTotal
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "MemUse"
property.Value = setting.SystemState.MemUse
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "DiskTotal"
property.Value = setting.SystemState.DiskTotal
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "DiskUse"
property.Value = setting.SystemState.DiskUse
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "Name"
property.Value = setting.SystemState.Name
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "SN"
property.Value = setting.SystemState.SN
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "HardVer"
property.Value = setting.SystemState.HardVer
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "SoftVer"
property.Value = setting.SystemState.SoftVer
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "SystemRTC"
property.Value = setting.SystemState.SystemRTC
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "RunTime"
property.Value = setting.SystemState.RunTime
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "DeviceOnline"
property.Value = setting.SystemState.DeviceOnline
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
property.Name = "DevicePacketLoss"
property.Value = setting.SystemState.DevicePacketLoss
property.TimeStamp = timeStamp
propertyMap = append(propertyMap, property)
//上报故障先加收到正确回应后清0
r.GWParam.ReportErrCnt++
setting.Logger.Debugf("service %s gw ReportErrCnt %d", r.GWParam.Param.ClientID, r.GWParam.ReportErrCnt)
//清空接收缓存
for i := 0; i < len(r.ReceiveReportPropertyAckFrameChan); i++ {
<-r.ReceiveReportPropertyAckFrameChan
}
propertyPostParam := MQTTEmqxPropertyPostParamTemplate{
ClientID: r.GWParam.Param.ClientID,
Properties: propertyMap,
}
propertyPostParamMap := make([]MQTTEmqxPropertyPostParamTemplate, 0)
propertyPostParamMap = append(propertyPostParamMap, propertyPostParam)
MQTTEmqxPropertyPost(r.GWParam, propertyPostParamMap)
select {
case frame := <-r.ReceiveReportPropertyAckFrameChan:
{
setting.Logger.Debugf("frameCode %v", frame.Code)
if frame.Code == 200 {
r.GWParam.ReportErrCnt--
setting.Logger.Debugf("%s MQTTEmqxGWPropertyPost OK", r.GWParam.ServiceName)
} else {
setting.Logger.Debugf("%s MQTTEmqxGWPropertyPost Err", r.GWParam.ServiceName)
}
}
case <-time.After(time.Millisecond * 2000):
{
setting.Logger.Debugf("%s MQTTEmqxGWPropertyPost Err", r.GWParam.ServiceName)
}
}
}
//指定设备上传属性
func (r *ReportServiceParamEmqxTemplate) NodePropertyPost(name []string) {
propertyPostParamMap := make([]MQTTEmqxPropertyPostParamTemplate, 0)
for _, n := range name {
for k, v := range r.NodeList {
if n == v.Name {
//上报故障计数值先加收到正确回应后清0
r.NodeList[k].ReportErrCnt++
propertyPostParam := MQTTEmqxPropertyPostParamTemplate{
ClientID: v.Param.ClientID,
}
timeStamp := time.Now().Unix()
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == v.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if d.Name == v.Name {
for _, v := range d.VariableMap {
if len(v.Value) >= 1 {
index := len(v.Value) - 1
property := MQTTEmqxPropertyPostParamPropertyTemplate{}
property.Name = v.Name
property.Value = v.Value[index].Value
property.TimeStamp = timeStamp
propertyPostParam.Properties = append(propertyPostParam.Properties, property)
}
}
}
}
}
}
propertyPostParamMap = append(propertyPostParamMap, propertyPostParam)
}
}
}
setting.Logger.Debugf("propertyPostParamMap %v", propertyPostParamMap)
pageCnt := len(propertyPostParamMap) / 20 //单包最大发送20个设备
if len(propertyPostParamMap)%20 != 0 {
pageCnt += 1
}
for pageIndex := 0; pageIndex < pageCnt; pageIndex++ {
if pageIndex != (pageCnt - 1) {
MQTTEmqxPropertyPost(r.GWParam, propertyPostParamMap[pageIndex:pageIndex+20])
} else { //最后一页
MQTTEmqxPropertyPost(r.GWParam, propertyPostParamMap[pageIndex+20*(pageCnt-1):])
}
select {
case frame := <-r.ReceiveReportPropertyAckFrameChan:
{
if frame.Code == 200 {
setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost OK", r.GWParam.ServiceName)
} else {
setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost Err", r.GWParam.ServiceName)
}
}
case <-time.After(time.Millisecond * 2000):
{
setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost Err", r.GWParam.ServiceName)
}
}
}
}
//func (r *ReportServiceParamEmqxTemplate) NodePropertyPost(name []string) {
//
// nodeList := make([]ReportServiceNodeParamEmqxTemplate, 0)
// for _, n := range name {
// for k, v := range r.NodeList {
// if n == v.Name {
// nodeList = append(nodeList, v)
// //上报故障计数值先加收到正确回应后清0
// r.NodeList[k].ReportErrCnt++
// }
// }
// }
//
// pageCnt := len(nodeList) / 20 //单包最大发送20个设备
// if len(nodeList)%20 != 0 {
// pageCnt += 1
// }
//
// for pageIndex := 0; pageIndex < pageCnt; pageIndex++ {
// if pageIndex != (pageCnt - 1) {
// propertyPost := MQTTPropertyPostTemplate{
// ID: strconv.Itoa(MsgID),
// Version: "V1.0",
// Ack: 1,
// }
// node := nodeList[20*pageIndex : 20*pageIndex+20]
// for _, n := range node {
// for _, c := range device.CollectInterfaceMap {
// if c.CollInterfaceName == n.CollInterfaceName {
// for _, d := range c.DeviceNodeMap {
// if d.Name == n.Name {
// propertyPostParam := MQTTPropertyPostParamTemplate{
// ClientID: n
// }
// for _, v := range d.VariableMap {
// if len(v.Value) >= 1 {
// index := len(v.Value) - 1
// property := propertyTemplate{}
// property.Name = v.Name
// property.Value = v.Value[index].Value
// valueMap = append(valueMap, property)
// }
// }
// NodeValue := MQTTEmqxNodeValueTemplate{}
// NodeValue.ValueMap = valueMap
// NodeValue.ProductKey = n.Param.ProductKey
// NodeValue.DeviceName = n.Param.DeviceName
// NodeValueMap = append(NodeValueMap, NodeValue)
// }
// }
// }
// }
// }
//
//
// MsgID++
// MQTTEmqxPropertyPost(r.GWParam, NodeValueMap)
// select {
// case frame := <-r.ReceiveReportPropertyAckFrameChan:
// {
// if frame.Code == 200 {
// setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost OK", r.GWParam.ServiceName)
// } else {
// setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost Err", r.GWParam.ServiceName)
// }
// }
// case <-time.After(time.Millisecond * 2000):
// {
// setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost Err", r.GWParam.ServiceName)
// }
// }
// } else { //最后一页
// NodeValueMap := make([]MQTTEmqxNodeValueTemplate, 0)
// valueMap := make([]propertyTemplate, 0)
// node := nodeList[20*pageIndex : len(nodeList)]
// //log.Printf("nodeList %v\n", node)
// for _, n := range node {
// for _, c := range device.CollectInterfaceMap {
// if c.CollInterfaceName == n.CollInterfaceName {
// for _, d := range c.DeviceNodeMap {
// if d.Name == n.Name {
// for _, v := range d.VariableMap {
// if len(v.Value) >= 1 {
// index := len(v.Value) - 1
// property := propertyTemplate{}
// property.Name = v.Name
// property.Value = v.Value[index].Value
// valueMap = append(valueMap, property)
// }
// }
// NodeValue := MQTTEmqxNodeValueTemplate{}
// NodeValue.ValueMap = valueMap
// NodeValue.ProductKey = n.Param.ProductKey
// NodeValue.DeviceName = n.Param.DeviceName
// NodeValueMap = append(NodeValueMap, NodeValue)
// }
// }
// }
// }
// }
//
// mqttEmqxRegister := MQTTEmqxRegisterTemplate{
// RemoteIP: r.GWParam.IP,
// RemotePort: r.GWParam.Port,
// ProductKey: r.GWParam.Param.ProductKey,
// DeviceName: r.GWParam.Param.DeviceName,
// DeviceSecret: r.GWParam.Param.DeviceSecret,
// }
// //setting.Logger.Debugf("NodeValueMap %v", NodeValueMap)
// MQTTEmqxNodePropertyPost(r.GWParam.MQTTClient, mqttEmqxRegister, NodeValueMap)
//
// select {
// case frame := <-r.ReceiveReportPropertyAckFrameChan:
// {
// if frame.Code == 200 {
// setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost OK", r.GWParam.ServiceName)
// } else {
// setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost Err", r.GWParam.ServiceName)
// }
// }
// case <-time.After(time.Millisecond * 2000):
// {
// setting.Logger.Debugf("%s MQTTEmqxNodePropertyPost Err", r.GWParam.ServiceName)
// }
// }
// }
// }
//}

View File

@ -0,0 +1,107 @@
package mqttEmqx
import (
"encoding/json"
"goAdapter/device"
"goAdapter/setting"
)
type MQTTEmqxInvokeServiceAckParamTemplate struct {
ClientID string `json:"clientID"`
CmdName string `json:"cmdName"`
CmdStatus int `json:"cmdStatus"`
}
type MQTTEmqxInvokeServiceAckTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Code int `json:"Code"`
Params []MQTTEmqxInvokeServiceAckParamTemplate `json:"params"`
}
type MQTTEmqxInvokeServiceRequestParamTemplate struct {
ClientID string `json:"clientID"`
CmdName string `json:"cmdName"`
CmdParams map[string]interface{} `json:"cmdParams"`
}
type MQTTEmqxInvokeServiceRequestTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Ack int `json:"ack"`
Params []MQTTEmqxInvokeServiceRequestParamTemplate `json:"params"`
}
func (r *ReportServiceParamEmqxTemplate) ReportServiceEmqxInvokeServiceAck(reqFrame MQTTEmqxInvokeServiceRequestTemplate, code int, ackParams []MQTTEmqxInvokeServiceAckParamTemplate) {
ackFrame := MQTTEmqxInvokeServiceAckTemplate{
ID: reqFrame.ID,
Version: reqFrame.Version,
Code: code,
Params: ackParams,
}
sJson, _ := json.Marshal(ackFrame)
serviceInvokeTopic := "/sys/thing/event/service/invoke_reply/" + r.GWParam.Param.ClientID
setting.Logger.Infof("service invoke_reply topic: %s", serviceInvokeTopic)
setting.Logger.Debugf("service invoke_reply: %v", string(sJson))
if r.GWParam.MQTTClient != nil {
token := r.GWParam.MQTTClient.Publish(serviceInvokeTopic, 0, false, sJson)
token.Wait()
}
}
func (r *ReportServiceParamEmqxTemplate) ReportServiceEmqxProcessInvokeService(reqFrame MQTTEmqxInvokeServiceRequestTemplate) {
ReadStatus := false
ackParams := make([]MQTTEmqxInvokeServiceAckParamTemplate, 0)
for _, v := range reqFrame.Params {
for _, node := range r.NodeList {
if v.ClientID == node.Param.ClientID {
//从上报节点中找到相应节点
for _, coll := range device.CollectInterfaceMap {
if coll.CollInterfaceName == node.CollInterfaceName {
for _, n := range coll.DeviceNodeMap {
if n.Name == node.Name {
//从采集服务中找到相应节点
cmd := device.CommunicationCmdTemplate{}
cmd.CollInterfaceName = node.CollInterfaceName
cmd.DeviceName = node.Name
cmd.FunName = v.CmdName
paramStr, _ := json.Marshal(v.CmdParams)
cmd.FunPara = string(paramStr)
ackParam := MQTTEmqxInvokeServiceAckParamTemplate{
ClientID: node.Param.ClientID,
CmdName: v.CmdName,
}
//从采集队列中找到
for _, comm := range device.CommunicationManage {
if comm.CollInterface == coll {
ackData := comm.CommunicationManageAddEmergency(cmd)
if ackData.Status {
ReadStatus = true
ackParam.CmdStatus = 0
} else {
ReadStatus = false
ackParam.CmdStatus = 1
}
}
}
ackParams = append(ackParams, ackParam)
}
}
}
}
}
}
}
if ReadStatus == true {
r.ReportServiceEmqxInvokeServiceAck(reqFrame, 0, ackParams)
} else {
r.ReportServiceEmqxInvokeServiceAck(reqFrame, 1, ackParams)
}
}

View File

@ -0,0 +1,50 @@
package mqttEmqx
import (
"bytes"
"crypto/hmac"
"crypto/sha1"
"fmt"
)
type AuthInfo struct {
password string
username string
mqttClientId string
}
func MqttClient_CalculateSign(productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
clientId := productKey + "&" + deviceName
var raw_passwd bytes.Buffer
raw_passwd.WriteString("clientId")
raw_passwd.WriteString(clientId)
raw_passwd.WriteString("deviceName")
raw_passwd.WriteString(deviceName)
raw_passwd.WriteString("productKey")
raw_passwd.WriteString(productKey)
raw_passwd.WriteString("timestamp")
raw_passwd.WriteString(timeStamp)
//log.Println(raw_passwd.String())
// hmac, use sha1
mac := hmac.New(sha1.New, []byte(deviceSecret))
mac.Write([]byte(raw_passwd.String()))
password := fmt.Sprintf("%02x", mac.Sum(nil))
//log.Println(password)
username := deviceName + "&" + productKey
var MQTTClientId bytes.Buffer
MQTTClientId.WriteString(clientId)
MQTTClientId.WriteString("|securemode=3,signmethod=hmacsha1,timestamp=")
MQTTClientId.WriteString(timeStamp)
MQTTClientId.WriteString("|")
auth := AuthInfo{
password: password,
username: username,
mqttClientId: MQTTClientId.String(),
}
return auth
}

View File

@ -0,0 +1,116 @@
package mqttEmqx
import (
"encoding/json"
"goAdapter/device"
"goAdapter/setting"
)
type MQTTEmqxWritePropertyRequestParamPropertyTemplate struct {
Name string `json:"name"`
Value interface{} `json:"value"`
}
type MQTTEmqxWritePropertyRequestParamTemplate struct {
ClientID string `json:"clientID"`
Properties []MQTTEmqxWritePropertyRequestParamPropertyTemplate `json:"properties"`
}
type MQTTEmqxWritePropertyRequestTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Ack int `json:"ack"`
Params []MQTTEmqxWritePropertyRequestParamTemplate `json:"params"`
}
type MQTTEmqxWritePropertyAckTemplate struct {
ID string `json:"id"`
Version string `json:"version"`
Code int `json:"code"`
Params []MQTTEmqxWritePropertyRequestParamTemplate `json:"params"`
}
func (r *ReportServiceParamEmqxTemplate) ReportServiceEmqxWritePropertyAck(reqFrame MQTTEmqxWritePropertyRequestTemplate, code int, ackParams []MQTTEmqxWritePropertyRequestParamTemplate) {
ackFrame := MQTTEmqxWritePropertyAckTemplate{
ID: reqFrame.ID,
Version: reqFrame.Version,
Code: code,
Params: ackParams,
}
sJson, _ := json.Marshal(ackFrame)
propertyPostTopic := "/sys/thing/event/property/set_reply/" + r.GWParam.Param.ClientID
setting.Logger.Infof("property set_reply topic: %s", propertyPostTopic)
setting.Logger.Debugf("property set_reply: %v", string(sJson))
if r.GWParam.MQTTClient != nil {
token := r.GWParam.MQTTClient.Publish(propertyPostTopic, 0, false, sJson)
token.Wait()
}
}
func (r *ReportServiceParamEmqxTemplate) ReportServiceEmqxProcessWriteProperty(reqFrame MQTTEmqxWritePropertyRequestTemplate) {
writeStatus := false
ackParams := make([]MQTTEmqxWritePropertyRequestParamTemplate, 0)
for _, v := range reqFrame.Params {
for _, node := range r.NodeList {
if v.ClientID == node.Param.ClientID {
//从上报节点中找到相应节点
for _, coll := range device.CollectInterfaceMap {
if coll.CollInterfaceName == node.CollInterfaceName {
for _, n := range coll.DeviceNodeMap {
if n.Name == node.Name {
//从采集服务中找到相应节点
cmd := device.CommunicationCmdTemplate{}
cmd.CollInterfaceName = node.CollInterfaceName
cmd.DeviceName = node.Name
cmd.FunName = "SetVariables"
valueMap := make(map[string]interface{})
for _, pro := range v.Properties {
valueMap[pro.Name] = pro.Value
}
paramStr, _ := json.Marshal(valueMap)
cmd.FunPara = string(paramStr)
param := MQTTEmqxWritePropertyRequestParamTemplate{
ClientID: node.Param.ClientID,
}
property := MQTTEmqxWritePropertyRequestParamPropertyTemplate{}
for _, comm := range device.CommunicationManage {
if comm.CollInterface == coll {
ackData := comm.CommunicationManageAddEmergency(cmd)
if ackData.Status {
writeStatus = true
for _, p := range v.Properties {
property.Name = p.Name
property.Value = 0
param.Properties = append(param.Properties, property)
}
} else {
writeStatus = false
for _, p := range v.Properties {
property.Name = p.Name
property.Value = 1
param.Properties = append(param.Properties, property)
}
}
}
}
ackParams = append(ackParams, param)
}
}
}
}
}
}
}
if writeStatus == true {
r.ReportServiceEmqxWritePropertyAck(reqFrame, 0, ackParams)
} else {
r.ReportServiceEmqxWritePropertyAck(reqFrame, 1, ackParams)
}
}

View File

@ -1,6 +1,526 @@
package mqttEMQX
package mqttEmqx
type ReportServiceGWParamEmqxTemplate struct {
ProductKey string
DeviceName string
import (
"encoding/json"
"fmt"
"goAdapter/device"
"goAdapter/setting"
"log"
"os"
"path/filepath"
"strings"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/robfig/cron"
)
//上报节点参数结构体
type ReportServiceNodeParamEmqxTemplate struct {
ServiceName string
CollInterfaceName string
Name string
Addr string
CommStatus string
ReportErrCnt int `json:"-"`
ReportStatus string
Protocol string
Param struct {
ClientID string
}
}
//上报网关参数结构体
type ReportServiceGWParamEmqxTemplate struct {
ServiceName string
IP string
Port string
ReportStatus string
ReportTime int
ReportErrCnt int
Protocol string
Param struct {
Username string
Password string
ClientID string
}
MQTTClient MQTT.Client `json:"-"`
}
//上报服务参数,网关参数,节点参数
type ReportServiceParamEmqxTemplate struct {
GWParam ReportServiceGWParamEmqxTemplate
NodeList []ReportServiceNodeParamEmqxTemplate
ReceiveFrameChan chan MQTTEmqxReceiveFrameTemplate `json:"-"`
LogInRequestFrameChan chan []string `json:"-"` //上线
ReceiveLogInAckFrameChan chan MQTTEmqxLogInAckTemplate `json:"-"`
LogOutRequestFrameChan chan []string `json:"-"`
ReceiveLogOutAckFrameChan chan MQTTEmqxLogOutAckTemplate `json:"-"`
ReportPropertyRequestFrameChan chan MQTTEmqxReportPropertyTemplate `json:"-"`
ReceiveReportPropertyAckFrameChan chan MQTTEmqxReportPropertyAckTemplate `json:"-"`
ReceiveInvokeServiceRequestFrameChan chan MQTTEmqxInvokeServiceRequestTemplate `json:"-"`
ReceiveInvokeServiceAckFrameChan chan MQTTEmqxInvokeServiceAckTemplate `json:"-"`
ReceiveWritePropertyRequestFrameChan chan MQTTEmqxWritePropertyRequestTemplate `json:"-"`
ReceiveReadPropertyRequestFrameChan chan MQTTEmqxReadPropertyRequestTemplate `json:"-"`
}
type ReportServiceParamListEmqxTemplate struct {
ServiceList []*ReportServiceParamEmqxTemplate
}
//实例化上报服务
var ReportServiceParamListEmqx = &ReportServiceParamListEmqxTemplate{
ServiceList: make([]*ReportServiceParamEmqxTemplate, 0),
}
func ReportServiceEmqxInit() {
ReportServiceParamListEmqx.ReadParamFromJson()
//初始化
for _, v := range ReportServiceParamListEmqx.ServiceList {
v.ReceiveFrameChan = make(chan MQTTEmqxReceiveFrameTemplate, 100)
v.LogInRequestFrameChan = make(chan []string, 0)
v.ReceiveLogInAckFrameChan = make(chan MQTTEmqxLogInAckTemplate, 5)
v.LogOutRequestFrameChan = make(chan []string, 0)
v.ReceiveLogOutAckFrameChan = make(chan MQTTEmqxLogOutAckTemplate, 5)
v.ReportPropertyRequestFrameChan = make(chan MQTTEmqxReportPropertyTemplate, 50)
v.ReceiveReportPropertyAckFrameChan = make(chan MQTTEmqxReportPropertyAckTemplate, 50)
v.ReceiveInvokeServiceRequestFrameChan = make(chan MQTTEmqxInvokeServiceRequestTemplate, 50)
v.ReceiveInvokeServiceAckFrameChan = make(chan MQTTEmqxInvokeServiceAckTemplate, 50)
v.ReceiveWritePropertyRequestFrameChan = make(chan MQTTEmqxWritePropertyRequestTemplate, 50)
v.ReceiveReadPropertyRequestFrameChan = make(chan MQTTEmqxReadPropertyRequestTemplate, 50)
go ReportServiceEmqxPoll(v)
}
}
func fileExist(path string) bool {
_, err := os.Lstat(path)
return !os.IsNotExist(err)
}
func (s *ReportServiceParamListEmqxTemplate) ReadParamFromJson() bool {
exeCurDir, _ := filepath.Abs(filepath.Dir(os.Args[0]))
fileDir := exeCurDir + "/selfpara/reportServiceParamListEmqx.json"
if fileExist(fileDir) == true {
fp, err := os.OpenFile(fileDir, os.O_RDONLY, 0777)
if err != nil {
log.Println("open reportServiceParamListEmqx.json err,", err)
return false
}
defer fp.Close()
data := make([]byte, 20480)
dataCnt, err := fp.Read(data)
err = json.Unmarshal(data[:dataCnt], s)
if err != nil {
log.Println("reportServiceParamListEmqx unmarshal err", err)
return false
}
setting.Logger.Info("read reportServiceParamListEmqx.json ok")
return true
} else {
setting.Logger.Warn("reportServiceParamListEmqx.json is not exist")
return false
}
}
func (s *ReportServiceParamListEmqxTemplate) WriteParamToJson() {
exeCurDir, _ := filepath.Abs(filepath.Dir(os.Args[0]))
fileDir := exeCurDir + "/selfpara/reportServiceParamListEmqx.json"
fp, err := os.OpenFile(fileDir, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0777)
if err != nil {
log.Println("open reportServiceParamListEmqx.json err", err)
return
}
defer fp.Close()
sJson, _ := json.Marshal(*s)
_, err = fp.Write(sJson)
if err != nil {
setting.Logger.Errorf("write reportServiceParamListEmqx.json err", err)
}
setting.Logger.Debugf("write reportServiceParamListEmqx.json success")
}
func (s *ReportServiceParamListEmqxTemplate) AddReportService(param ReportServiceGWParamEmqxTemplate) {
for k, v := range s.ServiceList {
//存在相同的,表示修改;不存在表示增加
if v.GWParam.ServiceName == param.ServiceName {
s.ServiceList[k].GWParam = param
s.WriteParamToJson()
return
}
}
ReportServiceParam := &ReportServiceParamEmqxTemplate{
GWParam: param,
}
s.ServiceList = append(s.ServiceList, ReportServiceParam)
s.WriteParamToJson()
}
func (s *ReportServiceParamListEmqxTemplate) DeleteReportService(serviceName string) {
for k, v := range s.ServiceList {
if v.GWParam.ServiceName == serviceName {
s.ServiceList = append(s.ServiceList[:k], s.ServiceList[k+1:]...)
s.WriteParamToJson()
return
}
}
}
func (r *ReportServiceParamEmqxTemplate) AddReportNode(param ReportServiceNodeParamEmqxTemplate) {
param.CommStatus = "offLine"
param.ReportStatus = "offLine"
param.ReportErrCnt = 0
//节点存在则进行修改
for k, v := range r.NodeList {
//节点已经存在
if v.Name == param.Name {
r.NodeList[k] = param
ReportServiceParamListEmqx.WriteParamToJson()
return
}
}
//节点不存在则新建
r.NodeList = append(r.NodeList, param)
ReportServiceParamListEmqx.WriteParamToJson()
setting.Logger.Debugf("param %v", ReportServiceParamListEmqx)
}
func (r *ReportServiceParamEmqxTemplate) DeleteReportNode(name string) int {
index := -1
//节点存在则进行修改
for k, v := range r.NodeList {
//节点已经存在
if v.Name == name {
index = k
r.NodeList = append(r.NodeList[:k], r.NodeList[k+1:]...)
ReportServiceParamListEmqx.WriteParamToJson()
return index
}
}
return index
}
func (r *ReportServiceParamEmqxTemplate) ProcessUpLinkFrame() {
for {
select {
case reqFrame := <-r.LogInRequestFrameChan:
{
r.LogIn(reqFrame)
}
case reqFrame := <-r.LogOutRequestFrameChan:
{
r.LogOut(reqFrame)
}
case reqFrame := <-r.ReportPropertyRequestFrameChan:
{
if reqFrame.DeviceType == "gw" {
r.GWPropertyPost()
} else if reqFrame.DeviceType == "node" {
r.NodePropertyPost(reqFrame.DeviceName)
}
}
case reqFrame := <-r.ReceiveWritePropertyRequestFrameChan:
{
r.ReportServiceEmqxProcessWriteProperty(reqFrame)
}
case reqFrame := <-r.ReceiveReadPropertyRequestFrameChan:
{
r.ReportServiceEmqxProcessReadProperty(reqFrame)
}
case reqFrame := <-r.ReceiveInvokeServiceRequestFrameChan:
{
r.ReportServiceEmqxProcessInvokeService(reqFrame)
}
}
}
}
func (r *ReportServiceParamEmqxTemplate) ProcessDownLinkFrame() {
for {
select {
case frame := <-r.ReceiveFrameChan:
{
//setting.Logger.Debugf("Recv TOPIC: %s", frame.Topic)
//setting.Logger.Debugf("Recv MSG: %v", frame.Payload)
if strings.Contains(frame.Topic, "/thing/event/property/pack/post_reply") { //网关、子设备上报属性回应
ackFrame := MQTTEmqxReportPropertyAckTemplate{}
err := json.Unmarshal(frame.Payload, &ackFrame)
if err != nil {
setting.Logger.Errorf("ReportPropertyAck json unmarshal err")
return
}
r.ReceiveReportPropertyAckFrameChan <- ackFrame
} else if strings.Contains(frame.Topic, "/combine/batch_login_reply") { //子设备上线回应
ackFrame := MQTTEmqxLogInAckTemplate{}
err := json.Unmarshal(frame.Payload, &ackFrame)
if err != nil {
setting.Logger.Warningf("LogInAck json unmarshal err")
return
}
r.ReceiveLogInAckFrameChan <- ackFrame
} else if strings.Contains(frame.Topic, "/combine/batch_logout_reply") { //子设备下线回应
ackFrame := MQTTEmqxLogOutAckTemplate{}
err := json.Unmarshal(frame.Payload, &ackFrame)
if err != nil {
setting.Logger.Errorf("LogOutAck json unmarshal err")
return
}
r.ReceiveLogOutAckFrameChan <- ackFrame
} else if strings.Contains(frame.Topic, "/sys/thing/event/service/invoke") { //设备服务调用
serviceFrame := MQTTEmqxInvokeServiceRequestTemplate{}
err := json.Unmarshal(frame.Payload, &serviceFrame)
if err != nil {
setting.Logger.Errorf("serviceFrame json unmarshal err")
return
}
r.ReceiveInvokeServiceRequestFrameChan <- serviceFrame
} else if strings.Contains(frame.Topic, "/sys/thing/event/property/set") { //设置属性请求
writePropertyFrame := MQTTEmqxWritePropertyRequestTemplate{}
err := json.Unmarshal(frame.Payload, &writePropertyFrame)
if err != nil {
setting.Logger.Errorf("writePropertyFrame json unmarshal err")
return
}
r.ReceiveWritePropertyRequestFrameChan <- writePropertyFrame
} else if strings.Contains(frame.Topic, "/sys/thing/event/property/get") { //获取属性请求
readPropertyFrame := MQTTEmqxReadPropertyRequestTemplate{}
err := json.Unmarshal(frame.Payload, &readPropertyFrame)
if err != nil {
setting.Logger.Errorf("readPropertyFrame json unmarshal err")
return
}
r.ReceiveReadPropertyRequestFrameChan <- readPropertyFrame
}
}
}
}
}
func (r *ReportServiceParamEmqxTemplate) LogIn(nodeName []string) {
//清空接收chan避免出现有上次接收的缓存
for i := 0; i < len(r.ReceiveLogInAckFrameChan); i++ {
<-r.ReceiveLogInAckFrameChan
}
r.NodeLogIn(nodeName)
}
func (r *ReportServiceParamEmqxTemplate) LogOut(nodeName []string) {
//清空接收chan避免出现有上次接收的缓存
for i := 0; i < len(r.ReceiveLogOutAckFrameChan); i++ {
<-r.ReceiveLogOutAckFrameChan
}
r.NodeLogOut(nodeName)
}
//查看上报服务中设备通信状态
func (r *ReportServiceParamEmqxTemplate) ReportCommStatusTimeFun() {
setting.Logger.Infof("service:%s CheckCommStatus", r.GWParam.ServiceName)
for k, n := range r.NodeList {
name := make([]string, 0)
for _, c := range device.CollectInterfaceMap {
if c.CollInterfaceName == n.CollInterfaceName {
for _, d := range c.DeviceNodeMap {
if n.Name == d.Name {
//通信状态发生了改变
if d.CommStatus != n.CommStatus {
if d.CommStatus == "onLine" {
setting.Logger.Infof("DeviceOnline %v", n.Name)
name = append(name, n.Name)
r.LogInRequestFrameChan <- name
} else if d.CommStatus == "offLine" {
setting.Logger.Infof("DeviceOffline %v", n.Name)
name = append(name, n.Name)
r.LogOutRequestFrameChan <- name
}
r.NodeList[k].CommStatus = d.CommStatus
}
}
}
}
}
}
}
func (r *ReportServiceParamEmqxTemplate) ReportTimeFun() {
if r.GWParam.ReportStatus == "onLine" {
//网关上报
reportGWProperty := MQTTEmqxReportPropertyTemplate{
DeviceType: "gw",
}
r.ReportPropertyRequestFrameChan <- reportGWProperty
//全部末端设备上报
nodeName := make([]string, 0)
for _, v := range r.NodeList {
nodeName = append(nodeName, v.Name)
}
setting.Logger.Debugf("report Nodes %v", nodeName)
if len(nodeName) > 0 {
reportNodeProperty := MQTTEmqxReportPropertyTemplate{
DeviceType: "node",
DeviceName: nodeName,
}
r.ReportPropertyRequestFrameChan <- reportNodeProperty
}
}
}
//查看上报服务中设备是否离线
func (r *ReportServiceParamEmqxTemplate) ReportOfflineTimeFun() {
setting.Logger.Infof("service:%s CheckReportOffline", r.GWParam.ServiceName)
if r.GWParam.ReportErrCnt >= 3 {
r.GWParam.ReportStatus = "offLine"
r.GWParam.ReportErrCnt = 0
setting.Logger.Warningf("service:%s gw offline", r.GWParam.ServiceName)
}
for k, v := range r.NodeList {
if v.ReportErrCnt >= 3 {
r.NodeList[k].ReportStatus = "offLine"
r.NodeList[k].ReportErrCnt = 0
setting.Logger.Warningf("service:%s %s offline", v.ServiceName, v.Name)
}
}
}
func ReportServiceEmqxPoll(r *ReportServiceParamEmqxTemplate) {
reportState := 0
// 定义一个cron运行器
cronProcess := cron.New()
//每10s查看一下上报节点的通信状态
reportCommStatusTime := fmt.Sprintf("@every %dm%ds", 10/60, 10%60)
setting.Logger.Infof("reportServiceEmqx reportCommStatusTime%v", reportCommStatusTime)
reportTime := fmt.Sprintf("@every %dm%ds", r.GWParam.ReportTime/60, r.GWParam.ReportTime%60)
setting.Logger.Infof("reportServiceEmqx reportTime%v", reportTime)
reportOfflineTime := fmt.Sprintf("@every %dm%ds", (3*r.GWParam.ReportTime)/60, (3*r.GWParam.ReportTime)%60)
setting.Logger.Infof("reportServiceEmqx reportOfflineTime%v", reportOfflineTime)
_ = cronProcess.AddFunc(reportCommStatusTime, r.ReportCommStatusTimeFun)
_ = cronProcess.AddFunc(reportOfflineTime, r.ReportOfflineTimeFun)
_ = cronProcess.AddFunc(reportTime, r.ReportTimeFun)
cronProcess.Start()
defer cronProcess.Stop()
go r.ProcessUpLinkFrame()
go r.ProcessDownLinkFrame()
//name := make([]string, 0)
for {
switch reportState {
case 0:
{
if r.GWLogin() == true {
reportState = 1
} else {
time.Sleep(5 * time.Second)
}
}
case 1:
{
//网关
if r.GWParam.ReportStatus == "offLine" {
reportState = 0
r.GWParam.ReportErrCnt = 0
}
//for k, v := range r.NodeList {
// commStatus := "offLine"
// for _, d := range device.CollectInterfaceMap {
// if v.CollInterfaceName == v.CollInterfaceName {
// for _, n := range d.DeviceNodeMap {
// if v.Name == n.Name {
// commStatus = n.CommStatus
// break
// }
// }
// }
// }
// if commStatus != v.CommStatus {
// if commStatus == "onLine" {
// //节点发生了上线
// setting.Logger.Debugf("service %s,node %s onLine", v.ServiceName, v.Name)
// name = append(name, v.Name)
// r.LogInRequestFrameChan <- name
// name = name[0:0]
// } else if commStatus == "offLine" {
// //节点发生了离线
// setting.Logger.Debugf("service %s,node %s onLine", v.ServiceName, v.Name)
// name = append(name, v.Name)
// r.LogOutRequestFrameChan <- name
// name = name[0:0]
// }
// r.NodeList[k].CommStatus = commStatus
// }
//}
//节点有属性变化
//for _, c := range device.CollectInterfaceMap {
// for i := 0; i < len(c.PropertyReportChan); i++ {
// nodeName := <-c.PropertyReportChan
// for _, v := range r.NodeList {
// if v.Name == nodeName {
// if v.ReportStatus == "offLine" { //当设备上报状态是离线时立马发送设备上线
// name = append(name, nodeName)
// r.LogInRequestFrameChan <- name
// name = name[0:0]
// } else {
// name = append(name, nodeName)
// reportPropertyTemplate := MQTTEmqxReportPropertyTemplate{
// DeviceType: "node",
// DeviceName: name,
// }
// r.ReportPropertyRequestFrameChan <- reportPropertyTemplate
// name = name[0:0]
// }
// }
// }
// }
//}
}
}
time.Sleep(100 * time.Millisecond)
}
}

View File

@ -64,7 +64,8 @@ func ReportServiceHuaweiProcessWriteCmd(r *ReportServiceParamHuaweiTemplate, req
cmd.FunPara = string(paramStr)
cmdAck := MQTTHuaweiWriteCmdAckTemplate{}
if device.CommunicationManage[y].CommunicationManageAddEmergency(cmd) == true {
cmdRX := device.CommunicationManage[y].CommunicationManageAddEmergency(cmd)
if cmdRX.Status == true {
setting.Logger.Debugf("WriteCmd ok")
cmdAck.ResultCode = 0
cmdAck.ResponseName = request.ServiceID

View File

@ -61,7 +61,8 @@ func ReportServiceHuaweiProcessGetProperties(r *ReportServiceParamHuaweiTemplate
cmd.FunName = "GetRealVariables"
cmd.FunPara = ""
if device.CommunicationManage[y].CommunicationManageAddEmergency(cmd) == true {
cmdRX := device.CommunicationManage[y].CommunicationManageAddEmergency(cmd)
if cmdRX.Status == true {
setting.Logger.Debugf("GetRealVariables ok")
service := MQTTHuaweiServiceTemplate{}
for _, v := range device.CollectInterfaceMap[y].DeviceNodeMap[i].VariableMap {

View File

@ -2,6 +2,7 @@ package report
import (
"goAdapter/report/mqttAliyun"
mqttEmqx "goAdapter/report/mqttEMQX"
"goAdapter/report/mqttHuawei"
)
@ -22,5 +23,8 @@ func ReportServiceInit() {
mqttAliyun.ReportServiceAliyunInit()
mqttEmqx.ReportServiceEmqxInit()
mqttHuawei.ReportServiceHuaweiInit()
}