增加:1、增加eventBus事件总线
修改:1、采集接口和通信队列增加对eventBus的支持 2、Emqx上报服务适配eventBus
This commit is contained in:
parent
76b39c290d
commit
b6c4212068
|
@ -2,25 +2,13 @@ package device
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"goAdapter/device/eventBus"
|
||||
"goAdapter/setting"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxCollectInterfaceManage int = 2
|
||||
|
||||
InterFaceID0 int = 0
|
||||
InterFaceID1 int = 1
|
||||
InterFaceID2 int = 2
|
||||
InterFaceID3 int = 3
|
||||
InterFaceID4 int = 4
|
||||
InterFaceID5 int = 5
|
||||
InterFaceID6 int = 6
|
||||
InterFaceID7 int = 7
|
||||
)
|
||||
|
||||
type CommunicationMessageTemplate struct {
|
||||
CollName string `json:"CollInterfaceName"` //接口名称
|
||||
TimeStamp string `json:"TimeStamp"` //时间戳
|
||||
|
@ -28,6 +16,11 @@ type CommunicationMessageTemplate struct {
|
|||
Content string `json:"DataContent"` //数据内容
|
||||
}
|
||||
|
||||
type CollectInterfaceEventTemplate struct {
|
||||
Topic string
|
||||
Content interface{}
|
||||
}
|
||||
|
||||
//采集接口模板
|
||||
type CollectInterfaceTemplate struct {
|
||||
CollInterfaceName string `json:"CollInterfaceName"` //采集接口
|
||||
|
@ -42,6 +35,7 @@ type CollectInterfaceTemplate struct {
|
|||
OnlineReportChan chan string `json:"-"`
|
||||
OfflineReportChan chan string `json:"-"`
|
||||
PropertyReportChan chan string `json:"-"`
|
||||
CollEventBus eventBus.Bus `json:"-"` //事件总线
|
||||
}
|
||||
|
||||
var CollectInterfaceMap = make([]*CollectInterfaceTemplate, 0)
|
||||
|
@ -231,6 +225,7 @@ func NewCollectInterface(collInterfaceName, commInterfaceName string,
|
|||
OfflineReportChan: make(chan string, 100),
|
||||
OnlineReportChan: make(chan string, 100),
|
||||
PropertyReportChan: make(chan string, 100),
|
||||
CollEventBus: eventBus.NewBus(),
|
||||
}
|
||||
|
||||
return nodeManage
|
||||
|
@ -274,9 +269,14 @@ func (d *CollectInterfaceTemplate) NewDeviceNode(dName string, dType string, dAd
|
|||
node.CommSuccessCnt = 0
|
||||
node.CurCommFailCnt = 0
|
||||
node.CommStatus = "offLine"
|
||||
node.VariableMap = make([]VariableTemplate, 0)
|
||||
variables := node.NewVariables()
|
||||
node.VariableMap = append(node.VariableMap, variables...)
|
||||
//node.VariableMap = make([]VariableTemplate, 0)
|
||||
//variables := node.NewVariables()
|
||||
//node.VariableMap = append(node.VariableMap, variables...)
|
||||
|
||||
properties := node.NewVariables()
|
||||
node.Properties = append(node.Properties, properties...)
|
||||
services := node.NewServices()
|
||||
node.Services = append(node.Services, services...)
|
||||
|
||||
d.DeviceNodeMap = append(d.DeviceNodeMap, node)
|
||||
}
|
||||
|
@ -293,9 +293,12 @@ func (d *CollectInterfaceTemplate) AddDeviceNode(dName string, dType string, dAd
|
|||
node.CommSuccessCnt = 0
|
||||
node.CurCommFailCnt = 0
|
||||
node.CommStatus = "offLine"
|
||||
node.VariableMap = make([]VariableTemplate, 0)
|
||||
variables := node.NewVariables()
|
||||
node.VariableMap = append(node.VariableMap, variables...)
|
||||
//node.VariableMap = make([]VariableTemplate, 0)
|
||||
//variables := node.NewVariables()
|
||||
//node.VariableMap = append(node.VariableMap, variables...)
|
||||
|
||||
properties := node.NewVariables()
|
||||
node.Properties = append(node.Properties, properties...)
|
||||
|
||||
d.DeviceNodeMap = append(d.DeviceNodeMap, node)
|
||||
|
||||
|
|
|
@ -176,6 +176,14 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
|
|||
<-c.CollInterface.OfflineReportChan
|
||||
}
|
||||
c.CollInterface.OfflineReportChan <- v.Name
|
||||
content := CollectInterfaceEventTemplate{
|
||||
Topic: "offLine",
|
||||
Content: v.Name,
|
||||
}
|
||||
err := c.CollInterface.CollEventBus.Publish("offLine", content)
|
||||
if err != nil {
|
||||
setting.Logger.Debugf("coll %v publish offLine", c.CollInterface.CollInterfaceName)
|
||||
}
|
||||
}
|
||||
v.CommStatus = "offLine"
|
||||
}
|
||||
|
@ -185,7 +193,7 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
|
|||
goto LoopCommonStep
|
||||
}
|
||||
//是否正确收到数据包
|
||||
case rxStatus := <-v.AnalysisRx(v.Addr, v.VariableMap, rxTotalBuf, rxTotalBufCnt):
|
||||
case rxStatus := <-v.AnalysisRx(v.Addr, v.Properties, rxTotalBuf, rxTotalBufCnt):
|
||||
{
|
||||
timerOut.Stop()
|
||||
setting.Logger.Debugf("%v:rx ok", c.CollInterface.CollInterfaceName)
|
||||
|
@ -213,6 +221,14 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
|
|||
<-c.CollInterface.OnlineReportChan
|
||||
}
|
||||
c.CollInterface.OnlineReportChan <- v.Name
|
||||
content := CollectInterfaceEventTemplate{
|
||||
Topic: "onLine",
|
||||
Content: v.Name,
|
||||
}
|
||||
err := c.CollInterface.CollEventBus.Publish("onLine", content)
|
||||
if err != nil {
|
||||
setting.Logger.Debugf("coll %v publish onLine", c.CollInterface.CollInterfaceName)
|
||||
}
|
||||
}
|
||||
|
||||
//防止Chan阻塞
|
||||
|
@ -222,6 +238,15 @@ func (c *CommunicationManageTemplate) CommunicationStateMachine(cmd Communicatio
|
|||
c.CollInterface.PropertyReportChan <- v.Addr
|
||||
//log.Printf("reportChan %v\n", len(c.CollInterface.PropertyReportChan))
|
||||
|
||||
content := CollectInterfaceEventTemplate{
|
||||
Topic: "update",
|
||||
Content: v.Name,
|
||||
}
|
||||
err := c.CollInterface.CollEventBus.Publish("update", content)
|
||||
if err != nil {
|
||||
setting.Logger.Debugf("coll %v publish update", c.CollInterface.CollInterfaceName)
|
||||
}
|
||||
|
||||
v.CommSuccessCnt++
|
||||
v.CurCommFailCnt = 0
|
||||
v.CommStatus = "onLine"
|
||||
|
|
|
@ -13,96 +13,123 @@ import (
|
|||
luar "layeh.com/gopher-luar"
|
||||
)
|
||||
|
||||
var MaxDeviceNodeCnt int = 50
|
||||
var lock sync.Mutex
|
||||
|
||||
type ValueTemplate struct {
|
||||
Value interface{} //变量值,不可以是字符串
|
||||
Explain interface{} //变量值解释,必须是字符串
|
||||
TimeStamp string
|
||||
}
|
||||
//type ValueTemplate struct {
|
||||
// Value interface{} //变量值,不可以是字符串
|
||||
// Explain interface{} //变量值解释,必须是字符串
|
||||
// TimeStamp string
|
||||
//}
|
||||
|
||||
//变量标签模版
|
||||
type VariableTemplate struct {
|
||||
Index int `json:"index"` //变量偏移量
|
||||
Name string `json:"name"` //变量名
|
||||
Label string `json:"lable"` //变量标签
|
||||
Value []ValueTemplate `json:"value"` //变量值
|
||||
Type string `json:"type"` //变量类型
|
||||
}
|
||||
//type VariableTemplate struct {
|
||||
// Index int `json:"index"` //变量偏移量
|
||||
// Name string `json:"name"` //变量名
|
||||
// Label string `json:"lable"` //变量标签
|
||||
// Value []ValueTemplate `json:"value"` //变量值
|
||||
// Type string `json:"type"` //变量类型
|
||||
//}
|
||||
|
||||
//设备模板
|
||||
type DeviceNodeTemplate struct {
|
||||
Index int `json:"Index"` //设备偏移量
|
||||
Name string `json:"Name"` //设备名称
|
||||
Addr string `json:"Addr"` //设备地址
|
||||
Type string `json:"Type"` //设备类型
|
||||
LastCommRTC string `json:"LastCommRTC"` //最后一次通信时间戳
|
||||
CommTotalCnt int `json:"CommTotalCnt"` //通信总次数
|
||||
CommSuccessCnt int `json:"CommSuccessCnt"` //通信成功次数
|
||||
CurCommFailCnt int `json:"-"` //当前通信失败次数
|
||||
CommStatus string `json:"CommStatus"` //通信状态
|
||||
VariableMap []VariableTemplate `json:"-"` //变量列表
|
||||
Index int `json:"Index"` //设备偏移量
|
||||
Name string `json:"Name"` //设备名称
|
||||
Addr string `json:"Addr"` //设备地址
|
||||
Type string `json:"Type"` //设备类型
|
||||
LastCommRTC string `json:"LastCommRTC"` //最后一次通信时间戳
|
||||
CommTotalCnt int `json:"CommTotalCnt"` //通信总次数
|
||||
CommSuccessCnt int `json:"CommSuccessCnt"` //通信成功次数
|
||||
CurCommFailCnt int `json:"-"` //当前通信失败次数
|
||||
CommStatus string `json:"CommStatus"` //通信状态
|
||||
//VariableMap []VariableTemplate `json:"-"` //变量列表
|
||||
Properties []DeviceTSLPropertyTemplate `json:"Properties"` //属性列表
|
||||
Services []DeviceTSLServiceTempalte `json:"Services"` //服务
|
||||
}
|
||||
|
||||
func (d *DeviceNodeTemplate) NewVariables() []VariableTemplate {
|
||||
//func (d *DeviceNodeTemplate) NewVariables() []VariableTemplate {
|
||||
//
|
||||
// type LuaVariableTemplate struct {
|
||||
// Index int
|
||||
// Name string
|
||||
// Label string
|
||||
// Type string
|
||||
// }
|
||||
//
|
||||
// type LuaVariableMapTemplate struct {
|
||||
// Variable []*LuaVariableTemplate
|
||||
// }
|
||||
//
|
||||
// lock.Lock()
|
||||
// //setting.Logger.Debugf("DeviceTypePluginMap %v", DeviceTypePluginMap)
|
||||
// for k, v := range DeviceNodeTypeMap.DeviceNodeType {
|
||||
// if d.Type == v.TemplateType {
|
||||
// //调用NewVariables
|
||||
// //setting.Logger.Debugf("TemplateType %v", v.TemplateType)
|
||||
// err := DeviceTypePluginMap[k].CallByParam(lua.P{
|
||||
// Fn: DeviceTypePluginMap[k].GetGlobal("NewVariables"),
|
||||
// NRet: 1,
|
||||
// Protect: true,
|
||||
// })
|
||||
// if err != nil {
|
||||
// setting.Logger.Warning("NewVariables err,", err)
|
||||
// }
|
||||
//
|
||||
// //获取返回结果
|
||||
// ret := DeviceTypePluginMap[k].Get(-1)
|
||||
// DeviceTypePluginMap[k].Pop(1)
|
||||
// //setting.Logger.Debugf("DeviceTypePluginMap Get,%v", ret)
|
||||
//
|
||||
// LuaVariableMap := LuaVariableMapTemplate{}
|
||||
//
|
||||
// if err := gluamapper.Map(ret.(*lua.LTable), &LuaVariableMap); err != nil {
|
||||
// setting.Logger.Warning("NewVariables gluamapper.Map err,", err)
|
||||
// }
|
||||
//
|
||||
// variables := make([]VariableTemplate, 0)
|
||||
//
|
||||
// for _, v := range LuaVariableMap.Variable {
|
||||
// variable := VariableTemplate{}
|
||||
// variable.Index = v.Index
|
||||
// variable.Name = v.Name
|
||||
// variable.Label = v.Label
|
||||
// variable.Type = v.Type
|
||||
//
|
||||
// variable.Value = make([]ValueTemplate, 0)
|
||||
// variables = append(variables, variable)
|
||||
// }
|
||||
// lock.Unlock()
|
||||
// //log.Printf("variables %v\n",variables)
|
||||
// return variables
|
||||
// }
|
||||
// }
|
||||
// lock.Unlock()
|
||||
// return nil
|
||||
//}
|
||||
|
||||
type LuaVariableTemplate struct {
|
||||
Index int
|
||||
Name string
|
||||
Label string
|
||||
Type string
|
||||
}
|
||||
func (d *DeviceNodeTemplate) NewVariables() []DeviceTSLPropertyTemplate {
|
||||
|
||||
type LuaVariableMapTemplate struct {
|
||||
Variable []*LuaVariableTemplate
|
||||
}
|
||||
properties := make([]DeviceTSLPropertyTemplate, 0)
|
||||
|
||||
lock.Lock()
|
||||
//setting.Logger.Debugf("DeviceTypePluginMap %v", DeviceTypePluginMap)
|
||||
for k, v := range DeviceNodeTypeMap.DeviceNodeType {
|
||||
if d.Type == v.TemplateType {
|
||||
//调用NewVariables
|
||||
//setting.Logger.Debugf("TemplateType %v", v.TemplateType)
|
||||
err := DeviceTypePluginMap[k].CallByParam(lua.P{
|
||||
Fn: DeviceTypePluginMap[k].GetGlobal("NewVariables"),
|
||||
NRet: 1,
|
||||
Protect: true,
|
||||
})
|
||||
if err != nil {
|
||||
setting.Logger.Warning("NewVariables err,", err)
|
||||
}
|
||||
|
||||
//获取返回结果
|
||||
ret := DeviceTypePluginMap[k].Get(-1)
|
||||
DeviceTypePluginMap[k].Pop(1)
|
||||
//setting.Logger.Debugf("DeviceTypePluginMap Get,%v", ret)
|
||||
|
||||
LuaVariableMap := LuaVariableMapTemplate{}
|
||||
|
||||
if err := gluamapper.Map(ret.(*lua.LTable), &LuaVariableMap); err != nil {
|
||||
setting.Logger.Warning("NewVariables gluamapper.Map err,", err)
|
||||
}
|
||||
|
||||
variables := make([]VariableTemplate, 0)
|
||||
|
||||
for _, v := range LuaVariableMap.Variable {
|
||||
variable := VariableTemplate{}
|
||||
variable.Index = v.Index
|
||||
variable.Name = v.Name
|
||||
variable.Label = v.Label
|
||||
variable.Type = v.Type
|
||||
|
||||
variable.Value = make([]ValueTemplate, 0)
|
||||
variables = append(variables, variable)
|
||||
}
|
||||
lock.Unlock()
|
||||
//log.Printf("variables %v\n",variables)
|
||||
return variables
|
||||
for _, v := range DeviceTSLMap {
|
||||
if v.Name == d.Type {
|
||||
properties = append(properties, v.Properties...)
|
||||
}
|
||||
}
|
||||
lock.Unlock()
|
||||
return nil
|
||||
|
||||
return properties
|
||||
}
|
||||
|
||||
func (d *DeviceNodeTemplate) NewServices() []DeviceTSLServiceTempalte {
|
||||
|
||||
services := make([]DeviceTSLServiceTempalte, 0)
|
||||
|
||||
for _, v := range DeviceTSLMap {
|
||||
if v.Name == d.Type {
|
||||
services = append(services, v.Services...)
|
||||
}
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
|
||||
func (d *DeviceNodeTemplate) GenerateGetRealVariables(sAddr string, step int) ([]byte, bool, bool) {
|
||||
|
@ -115,14 +142,15 @@ func (d *DeviceNodeTemplate) GenerateGetRealVariables(sAddr string, step int) ([
|
|||
lock.Lock()
|
||||
for k, v := range DeviceNodeTypeMap.DeviceNodeType {
|
||||
if d.Type == v.TemplateType {
|
||||
//调用NewVariables
|
||||
//调用GenerateGetRealVariables
|
||||
err := DeviceTypePluginMap[k].CallByParam(lua.P{
|
||||
Fn: DeviceTypePluginMap[k].GetGlobal("GenerateGetRealVariables"),
|
||||
NRet: 1,
|
||||
Protect: true,
|
||||
}, lua.LString(sAddr), lua.LNumber(step))
|
||||
}, lua.LString(sAddr),
|
||||
lua.LNumber(step))
|
||||
if err != nil {
|
||||
setting.Logger.Warning("GenerateGetRealVariables err,", err)
|
||||
setting.Logger.Warning("GenerateGetRealVariables err ", err)
|
||||
}
|
||||
|
||||
//获取返回结果
|
||||
|
@ -134,24 +162,24 @@ func (d *DeviceNodeTemplate) GenerateGetRealVariables(sAddr string, step int) ([
|
|||
setting.Logger.Warning("GenerateGetRealVariables gluamapper.Map err,", err)
|
||||
}
|
||||
|
||||
ok := false
|
||||
con := false //后续是否有报文
|
||||
result := false
|
||||
continuous := false //后续是否有报文
|
||||
nBytes := make([]byte, 0)
|
||||
if len(LuaVariableMap.Variable) > 0 {
|
||||
ok = true
|
||||
result = true
|
||||
for _, v := range LuaVariableMap.Variable {
|
||||
nBytes = append(nBytes, *v)
|
||||
}
|
||||
if LuaVariableMap.Status == "0" {
|
||||
con = false
|
||||
continuous = false
|
||||
} else {
|
||||
con = true
|
||||
continuous = true
|
||||
}
|
||||
} else {
|
||||
ok = true
|
||||
result = true
|
||||
}
|
||||
lock.Unlock()
|
||||
return nBytes, ok, con
|
||||
return nBytes, result, continuous
|
||||
}
|
||||
}
|
||||
lock.Unlock()
|
||||
|
@ -198,24 +226,24 @@ func (d *DeviceNodeTemplate) DeviceCustomCmd(sAddr string, cmdName string, cmdPa
|
|||
return nil, false, false
|
||||
}
|
||||
|
||||
ok := false
|
||||
con := false //后续是否有报文
|
||||
result := false
|
||||
continuous := false //后续是否有报文
|
||||
if LuaVariableMap.Status == "0" {
|
||||
con = false
|
||||
continuous = false
|
||||
} else {
|
||||
con = true
|
||||
continuous = true
|
||||
}
|
||||
nBytes := make([]byte, 0)
|
||||
if len(LuaVariableMap.Variable) > 0 {
|
||||
ok = true
|
||||
result = true
|
||||
for _, v := range LuaVariableMap.Variable {
|
||||
nBytes = append(nBytes, *v)
|
||||
}
|
||||
} else {
|
||||
ok = false
|
||||
result = false
|
||||
}
|
||||
lock.Unlock()
|
||||
return nBytes, ok, con
|
||||
return nBytes, result, continuous
|
||||
}
|
||||
}
|
||||
lock.Unlock()
|
||||
|
@ -231,7 +259,7 @@ func getGoroutineID() uint64 {
|
|||
return n
|
||||
}
|
||||
|
||||
func (d *DeviceNodeTemplate) AnalysisRx(sAddr string, variables []VariableTemplate, rxBuf []byte, rxBufCnt int) chan bool {
|
||||
func (d *DeviceNodeTemplate) AnalysisRx(sAddr string, variables []DeviceTSLPropertyTemplate, rxBuf []byte, rxBufCnt int) chan bool {
|
||||
|
||||
status := make(chan bool, 1)
|
||||
|
||||
|
@ -266,29 +294,35 @@ func (d *DeviceNodeTemplate) AnalysisRx(sAddr string, variables []VariableTempla
|
|||
}, lua.LString(sAddr), lua.LNumber(rxBufCnt))
|
||||
if err != nil {
|
||||
setting.Logger.Warning("AnalysisRx err,", err)
|
||||
lock.Unlock()
|
||||
return status
|
||||
}
|
||||
|
||||
//获取返回结果
|
||||
ret := DeviceTypePluginMap[k].Get(-1)
|
||||
if ret == nil {
|
||||
lock.Unlock()
|
||||
return status
|
||||
}
|
||||
DeviceTypePluginMap[k].Pop(1)
|
||||
|
||||
LuaVariableMap := LuaVariableMapTemplate{}
|
||||
|
||||
if err := gluamapper.Map(ret.(*lua.LTable), &LuaVariableMap); err != nil {
|
||||
setting.Logger.Warning("AnalysisRx gluamapper.Map err,", err)
|
||||
setting.Logger.Warning("AnalysisRx gluamapper.Map err ", err)
|
||||
}
|
||||
|
||||
timeNowStr := time.Now().Format("2006-01-02 15:04:05")
|
||||
value := ValueTemplate{}
|
||||
value := DeviceTSLPropertyValueTemplate{}
|
||||
if LuaVariableMap.Status == "0" {
|
||||
if len(LuaVariableMap.Variable) > 0 {
|
||||
for _, lv := range LuaVariableMap.Variable {
|
||||
for k, v := range variables {
|
||||
if lv.Index == v.Index {
|
||||
variables[k].Index = lv.Index
|
||||
variables[k].Name = lv.Name
|
||||
variables[k].Label = lv.Label
|
||||
variables[k].Type = lv.Type
|
||||
for k, p := range variables {
|
||||
if lv.Name == p.Name {
|
||||
//variables[k].Index = lv.Index
|
||||
//variables[k].Name = lv.Name
|
||||
//variables[k].Explain = lv.Label
|
||||
//variables[k].Type = lv.Type
|
||||
|
||||
switch lv.Type {
|
||||
case "uint8":
|
||||
|
|
|
@ -31,6 +31,12 @@ type DeviceTSLPropertyParamTempate struct {
|
|||
Decimals int //小数位数
|
||||
}
|
||||
|
||||
type DeviceTSLPropertyValueTemplate struct {
|
||||
Value interface{} //变量值,不可以是字符串
|
||||
Explain interface{} //变量值解释,必须是字符串
|
||||
TimeStamp string
|
||||
}
|
||||
|
||||
type DeviceTSLPropertyTemplate struct {
|
||||
Name string //属性名称,只可以是字母和数字的组合
|
||||
Explain string //属性解释
|
||||
|
@ -38,6 +44,7 @@ type DeviceTSLPropertyTemplate struct {
|
|||
Unit string //单位
|
||||
Type int //类型 uint32 int32 double string
|
||||
Params DeviceTSLPropertyParamTempate
|
||||
Value []DeviceTSLPropertyValueTemplate
|
||||
}
|
||||
|
||||
type DeviceTSLServiceTempalte struct {
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
package eventBus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Bus type contains all topics and subscribers
|
||||
type Bus struct {
|
||||
subNode map[string]*node
|
||||
rw sync.RWMutex
|
||||
}
|
||||
|
||||
func NewBus() Bus {
|
||||
return Bus{
|
||||
subNode: make(map[string]*node),
|
||||
rw: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe means a subscriber follw a topic
|
||||
func (b *Bus) Subscribe(topic string, sub Sub) {
|
||||
b.rw.Lock()
|
||||
if n, ok := b.subNode[topic]; ok {
|
||||
// found the node
|
||||
b.rw.Unlock()
|
||||
n.rw.Lock()
|
||||
defer n.rw.Unlock()
|
||||
n.subs = append(n.subs, sub)
|
||||
} else {
|
||||
defer b.rw.Unlock()
|
||||
n := NewNode()
|
||||
b.subNode[topic] = &n
|
||||
n.subs = append(n.subs, sub)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bus) UnSubscribe(topic string, sub Sub) {
|
||||
b.rw.Lock()
|
||||
if n, ok := b.subNode[topic]; ok && (n.SubsLen() > 0) {
|
||||
b.rw.Unlock()
|
||||
b.subNode[topic].removeSub(sub)
|
||||
} else {
|
||||
b.rw.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Publish msg to all subscriber who have subscribed to the topic
|
||||
func (b *Bus) Publish(topic string, msg interface{}) error {
|
||||
b.rw.Lock()
|
||||
if n, ok := b.subNode[topic]; ok {
|
||||
// found the node
|
||||
b.rw.Unlock()
|
||||
n.rw.RLock()
|
||||
defer n.rw.RUnlock()
|
||||
// got the subs list and publish msg
|
||||
go func(subs []Sub, msg interface{}) {
|
||||
for _, sub := range subs {
|
||||
sub.receive(msg)
|
||||
}
|
||||
}(n.subs, msg)
|
||||
// successed return null
|
||||
return nil
|
||||
} else {
|
||||
// topic not exist
|
||||
defer b.rw.Unlock()
|
||||
return errors.New("topic not exist")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// PubFunc return a function that publish msg to one topic
|
||||
func (b *Bus) PubFunc(topic string) func(msg interface{}) {
|
||||
return func(msg interface{}) {
|
||||
b.Publish(topic, msg)
|
||||
}
|
||||
}
|
||||
|
||||
// SubsLen return the length of subs contained topic node
|
||||
func (b *Bus) SubsLen(topic string) (int, error) {
|
||||
b.rw.Lock()
|
||||
if n, ok := b.subNode[topic]; ok {
|
||||
// found the node
|
||||
b.rw.Unlock()
|
||||
n.rw.RLock()
|
||||
defer n.rw.RUnlock()
|
||||
return n.SubsLen(), nil
|
||||
} else {
|
||||
// topic not exist
|
||||
defer b.rw.Unlock()
|
||||
return 0, errors.New("topic not exist")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package eventBus
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// node contains a slice of subs that subscribe same topic
|
||||
type node struct {
|
||||
subs []Sub
|
||||
// Note node's rw will not be used when bus's rw is helded.
|
||||
rw sync.RWMutex
|
||||
}
|
||||
|
||||
// NewNode return new node
|
||||
func NewNode() node {
|
||||
return node{
|
||||
subs: []Sub{},
|
||||
rw: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (n *node) SubsLen() int {
|
||||
return len(n.subs)
|
||||
}
|
||||
|
||||
// if sub not exisit, no change
|
||||
func (n *node) removeSub(s Sub) {
|
||||
// lenOfSubs should placed before Lock Statement, otherwise it will cause a deadlock.
|
||||
lenOfSubs := len(n.subs)
|
||||
n.rw.Lock()
|
||||
defer n.rw.Unlock()
|
||||
idx := n.findSubIdx(s)
|
||||
if idx < 0 {
|
||||
return
|
||||
}
|
||||
copy(n.subs[idx:], n.subs[idx+1:])
|
||||
n.subs[lenOfSubs-1] = Sub{}
|
||||
n.subs = n.subs[:lenOfSubs-1]
|
||||
}
|
||||
|
||||
//findSubIdx return index of sub, if sub not exisit return -1.
|
||||
func (n *node) findSubIdx(s Sub) int {
|
||||
for idx, sub := range n.subs {
|
||||
if sub == s {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package eventBus
|
||||
|
||||
// Sub contains subscriber's informations ,like channel etc.
|
||||
type Sub struct {
|
||||
out chan interface{}
|
||||
}
|
||||
|
||||
func NewSub() Sub {
|
||||
return Sub{
|
||||
out: make(chan interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Sub) receive(msg interface{}) {
|
||||
s.out <- msg
|
||||
}
|
||||
|
||||
// Out return Sub.out channel
|
||||
func (s *Sub) Out() (msg interface{}) {
|
||||
return (<-s.out)
|
||||
}
|
5
go.mod
5
go.mod
|
@ -16,8 +16,6 @@ require (
|
|||
github.com/mitchellh/mapstructure v1.4.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/robfig/cron v1.2.0
|
||||
github.com/safchain/ethtool v0.0.0-20201023143004-874930cb3ce0
|
||||
github.com/shirou/gopsutil v3.21.3+incompatible
|
||||
github.com/shirou/gopsutil/v3 v3.21.3
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/smartystreets/goconvey v1.6.4 // indirect
|
||||
|
@ -25,7 +23,8 @@ require (
|
|||
github.com/thinkgos/gomodbus/v2 v2.2.2
|
||||
github.com/yuin/gluamapper v0.0.0-20150323120927-d836955830e7
|
||||
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da
|
||||
golang.org/x/tools v0.1.5 // indirect
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
|
||||
gopkg.in/ini.v1 v1.62.0
|
||||
layeh.com/gopher-luar v1.0.8
|
||||
)
|
||||
|
|
31
go.sum
31
go.sum
|
@ -3,11 +3,8 @@ github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 h1:5sXbqlSomvdjl
|
|||
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
|
||||
github.com/beevik/ntp v0.3.0 h1:xzVrPrE4ziasFXgBVBZJDP0Wg/KpMwk2KHJ4Ba8GrDw=
|
||||
github.com/beevik/ntp v0.3.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
|
||||
github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
|
@ -35,7 +32,6 @@ github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA
|
|||
github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA=
|
||||
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
|
||||
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
|
||||
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
|
@ -70,10 +66,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
|
||||
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
|
||||
github.com/safchain/ethtool v0.0.0-20201023143004-874930cb3ce0 h1:eskphjc5kRCykOJyX7HHVbJCs25/8knprttvrVvEd8o=
|
||||
github.com/safchain/ethtool v0.0.0-20201023143004-874930cb3ce0/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
|
||||
github.com/shirou/gopsutil v3.21.3+incompatible h1:uenXGGa8ESCQq+dbgtl916dmg6PSAz2cXov0uORQ9v8=
|
||||
github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
||||
github.com/shirou/gopsutil/v3 v3.21.3 h1:wgcdAHZS2H6qy4JFewVTtqfiYxFzCeEJod/mLztdPG8=
|
||||
github.com/shirou/gopsutil/v3 v3.21.3/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
|
@ -82,7 +74,6 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
|
|||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
|
||||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
|
@ -93,9 +84,7 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 h1:UyzmZLoiDWMRywV4DUY
|
|||
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
|
||||
github.com/thinkgos/gomodbus/v2 v2.2.2 h1:2ZkVxPShxg9ew0UCANdRxL7ZGA6anwRo66e8zifo+6w=
|
||||
github.com/thinkgos/gomodbus/v2 v2.2.2/go.mod h1:YynqZtDx2kFRhA+Ajy9qtmWOUIs5BOi3EaX6WQwH/jo=
|
||||
github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M=
|
||||
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
|
||||
github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc=
|
||||
github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1gBkr4QyP8=
|
||||
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
|
@ -103,27 +92,17 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
|
|||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/yuin/gluamapper v0.0.0-20150323120927-d836955830e7 h1:noHsffKZsNfU38DwcXWEPldrTjIZ8FPNKx8mYMGnqjs=
|
||||
github.com/yuin/gluamapper v0.0.0-20150323120927-d836955830e7/go.mod h1:bbMEM6aU1WDF1ErA5YJ0p91652pGv140gGw4Ww3RGp8=
|
||||
github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yuin/gopher-lua v0.0.0-20190206043414-8bfc7677f583/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
|
||||
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg=
|
||||
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -132,26 +111,16 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210217105451-b926d437f341 h1:2/QtM1mL37YmcsT8HaDNHDgTqqFVw+zr8UzMiBVLzYU=
|
||||
golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
|
||||
|
|
|
@ -547,18 +547,18 @@ func apiGetNodeVariableFromCache(context *gin.Context) {
|
|||
|
||||
for _, v := range device.CollectInterfaceMap {
|
||||
if v.CollInterfaceName == sName {
|
||||
for _, v := range v.DeviceNodeMap {
|
||||
if v.Addr == sAddr {
|
||||
for _, d := range v.DeviceNodeMap {
|
||||
if d.Addr == sAddr {
|
||||
|
||||
aParam.Code = "0"
|
||||
aParam.Message = ""
|
||||
aParam.Data = make([]VariableTemplate, 0)
|
||||
index := 0
|
||||
variable := VariableTemplate{}
|
||||
for _, v := range v.VariableMap {
|
||||
variable.Index = v.Index
|
||||
for _, v := range d.Properties {
|
||||
//variable.Index = v.Index
|
||||
variable.Name = v.Name
|
||||
variable.Label = v.Label
|
||||
variable.Label = v.Explain
|
||||
// 取出切片中最后一个值
|
||||
if len(v.Value) > 0 {
|
||||
index = len(v.Value) - 1
|
||||
|
@ -570,7 +570,7 @@ func apiGetNodeVariableFromCache(context *gin.Context) {
|
|||
variable.Explain = ""
|
||||
variable.TimeStamp = ""
|
||||
}
|
||||
variable.Type = v.Type
|
||||
//variable.Type = v.Type
|
||||
aParam.Data = append(aParam.Data, variable)
|
||||
}
|
||||
|
||||
|
@ -597,19 +597,18 @@ func apiGetNodeHistoryVariableFromCache(context *gin.Context) {
|
|||
aParam := &struct {
|
||||
Code string
|
||||
Message string
|
||||
Data []device.ValueTemplate
|
||||
Data []device.DeviceTSLPropertyValueTemplate
|
||||
}{}
|
||||
|
||||
for _, v := range device.CollectInterfaceMap {
|
||||
if v.CollInterfaceName == sName {
|
||||
for _, v := range v.DeviceNodeMap {
|
||||
if v.Addr == sAddr {
|
||||
|
||||
for _, d := range v.DeviceNodeMap {
|
||||
if d.Addr == sAddr {
|
||||
aParam.Code = "0"
|
||||
aParam.Message = ""
|
||||
for _, v := range v.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if v.Name == sVariable {
|
||||
aParam.Data = v.Value
|
||||
aParam.Data = append(aParam.Data, v.Value...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -683,10 +682,10 @@ func apiGetNodeReadVariable(context *gin.Context) {
|
|||
aParam.Data = make([]VariableTemplate, 0)
|
||||
index := 0
|
||||
variable := VariableTemplate{}
|
||||
for _, v := range c.DeviceNodeMap[nodeIndex].VariableMap {
|
||||
variable.Index = v.Index
|
||||
for _, v := range c.DeviceNodeMap[nodeIndex].Properties {
|
||||
//variable.Index = v.Index
|
||||
variable.Name = v.Name
|
||||
variable.Label = v.Label
|
||||
variable.Label = v.Explain
|
||||
// 取出切片中最后一个值
|
||||
if len(v.Value) > 0 {
|
||||
index = len(v.Value) - 1
|
||||
|
@ -698,7 +697,7 @@ func apiGetNodeReadVariable(context *gin.Context) {
|
|||
variable.Explain = ""
|
||||
variable.TimeStamp = ""
|
||||
}
|
||||
variable.Type = v.Type
|
||||
//variable.Type = v.Type
|
||||
aParam.Data = append(aParam.Data, variable)
|
||||
}
|
||||
|
||||
|
@ -1090,13 +1089,13 @@ func apiAddDeviceTSLProperties(context *gin.Context) {
|
|||
fmt.Println(string(bodyBuf[:n]))
|
||||
|
||||
tslInfo := &struct {
|
||||
TSLName string `json:"TSLName"` // 名称
|
||||
Properties []device.DeviceTSLPropertyTemplate `json:"Properties"` // 描述
|
||||
TSLName string `json:"TSLName"` // 名称
|
||||
Properties device.DeviceTSLPropertyTemplate `json:"Property"` //
|
||||
}{}
|
||||
|
||||
err := json.Unmarshal(bodyBuf[:n], tslInfo)
|
||||
if err != nil {
|
||||
setting.Logger.Errorf("AddDeviceTSLProperties json unMarshall err %v", err)
|
||||
setting.Logger.Errorf("AddDeviceTSLProperty json unMarshall err %v", err)
|
||||
|
||||
aParam.Code = "1"
|
||||
aParam.Message = "json unMarshall err"
|
||||
|
@ -1105,7 +1104,7 @@ func apiAddDeviceTSLProperties(context *gin.Context) {
|
|||
context.String(http.StatusOK, string(sJson))
|
||||
return
|
||||
}
|
||||
setting.Logger.Debugf("AddDeviceTSLProperties %v", tslInfo)
|
||||
setting.Logger.Debugf("AddDeviceTSLProperty %v", tslInfo)
|
||||
|
||||
index := -1
|
||||
for k, v := range device.DeviceTSLMap {
|
||||
|
@ -1122,9 +1121,7 @@ func apiAddDeviceTSLProperties(context *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
for _, v := range tslInfo.Properties {
|
||||
_, err = device.DeviceTSLMap[index].DeviceTSLPropertiesAdd(v)
|
||||
}
|
||||
_, err = device.DeviceTSLMap[index].DeviceTSLPropertiesAdd(tslInfo.Properties)
|
||||
|
||||
aParam.Code = "0"
|
||||
aParam.Message = ""
|
||||
|
|
|
@ -125,7 +125,7 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
|
|||
if c.CollInterfaceName == n.CollInterfaceName {
|
||||
for _, d := range c.DeviceNodeMap {
|
||||
if d.Name == n.Name {
|
||||
for _, v := range d.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
mqttAliyunValue := MQTTAliyunValueTemplate{}
|
||||
|
@ -181,7 +181,7 @@ func (r *ReportServiceParamAliyunTemplate) AllNodePropertyPost() {
|
|||
if c.CollInterfaceName == n.CollInterfaceName {
|
||||
for _, d := range c.DeviceNodeMap {
|
||||
if d.Name == n.Name {
|
||||
for _, v := range d.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
mqttAliyunValue := MQTTAliyunValueTemplate{}
|
||||
|
@ -264,7 +264,7 @@ func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(name []string) {
|
|||
if c.CollInterfaceName == n.CollInterfaceName {
|
||||
for _, d := range c.DeviceNodeMap {
|
||||
if d.Name == n.Name {
|
||||
for _, v := range d.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
mqttAliyunValue := MQTTAliyunValueTemplate{}
|
||||
|
@ -317,7 +317,7 @@ func (r *ReportServiceParamAliyunTemplate) NodePropertyPost(name []string) {
|
|||
if c.CollInterfaceName == n.CollInterfaceName {
|
||||
for _, d := range c.DeviceNodeMap {
|
||||
if d.Name == n.Name {
|
||||
for _, v := range d.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
mqttAliyunValue := MQTTAliyunValueTemplate{}
|
||||
|
|
|
@ -99,7 +99,7 @@ func (r *ReportServiceParamEmqxTemplate) ReportServiceEmqxProcessReadProperty(re
|
|||
if ackData.Status {
|
||||
ReadStatus = true
|
||||
for _, p := range v.Properties {
|
||||
for _, variable := range n.VariableMap {
|
||||
for _, variable := range n.Properties {
|
||||
if p.Name == variable.Name {
|
||||
if len(variable.Value) >= 1 {
|
||||
index := len(variable.Value) - 1
|
||||
|
|
|
@ -174,7 +174,7 @@ func (r *ReportServiceParamEmqxTemplate) NodePropertyPost(name []string) {
|
|||
if c.CollInterfaceName == v.CollInterfaceName {
|
||||
for _, d := range c.DeviceNodeMap {
|
||||
if d.Name == v.Name {
|
||||
for _, v := range d.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
property := MQTTEmqxPropertyPostParamPropertyTemplate{}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"goAdapter/device"
|
||||
"goAdapter/device/eventBus"
|
||||
"goAdapter/setting"
|
||||
"log"
|
||||
"os"
|
||||
|
@ -62,6 +63,7 @@ type ReportServiceParamEmqxTemplate struct {
|
|||
ReceiveInvokeServiceAckFrameChan chan MQTTEmqxInvokeServiceAckTemplate `json:"-"`
|
||||
ReceiveWritePropertyRequestFrameChan chan MQTTEmqxWritePropertyRequestTemplate `json:"-"`
|
||||
ReceiveReadPropertyRequestFrameChan chan MQTTEmqxReadPropertyRequestTemplate `json:"-"`
|
||||
//CollEventSubMap []eventBus.Sub `json:"-"`
|
||||
}
|
||||
|
||||
type ReportServiceParamListEmqxTemplate struct {
|
||||
|
@ -90,6 +92,7 @@ func ReportServiceEmqxInit() {
|
|||
v.ReceiveInvokeServiceAckFrameChan = make(chan MQTTEmqxInvokeServiceAckTemplate, 50)
|
||||
v.ReceiveWritePropertyRequestFrameChan = make(chan MQTTEmqxWritePropertyRequestTemplate, 50)
|
||||
v.ReceiveReadPropertyRequestFrameChan = make(chan MQTTEmqxReadPropertyRequestTemplate, 50)
|
||||
//v.CollEventSubMap = make([]eventBus.Sub, 0)
|
||||
|
||||
go ReportServiceEmqxPoll(v)
|
||||
}
|
||||
|
@ -323,6 +326,14 @@ func (r *ReportServiceParamEmqxTemplate) ProcessDownLinkFrame() {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *ReportServiceParamEmqxTemplate) ProcessCollEvent(sub eventBus.Sub) {
|
||||
|
||||
for {
|
||||
msg := sub.Out().(device.CollectInterfaceEventTemplate)
|
||||
setting.Logger.Debugf("EMQX collEvent ReceiveMsgTopic %v ReceiveMsgContent %v", msg.Topic, msg.Content)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReportServiceParamEmqxTemplate) LogIn(nodeName []string) {
|
||||
|
||||
//清空接收chan,避免出现有上次接收的缓存
|
||||
|
@ -440,8 +451,17 @@ func ReportServiceEmqxPoll(r *ReportServiceParamEmqxTemplate) {
|
|||
_ = cronProcess.AddFunc(reportOfflineTime, r.ReportOfflineTimeFun)
|
||||
_ = cronProcess.AddFunc(reportTime, r.ReportTimeFun)
|
||||
|
||||
go r.ProcessUpLinkFrame()
|
||||
//订阅采集接口消息
|
||||
for _, coll := range device.CollectInterfaceMap {
|
||||
sub := eventBus.NewSub()
|
||||
coll.CollEventBus.Subscribe("onLine", sub)
|
||||
coll.CollEventBus.Subscribe("offLine", sub)
|
||||
coll.CollEventBus.Subscribe("update", sub)
|
||||
//r.CollEventSubMap = append(r.CollEventSubMap, sub)
|
||||
go r.ProcessCollEvent(sub)
|
||||
}
|
||||
|
||||
go r.ProcessUpLinkFrame()
|
||||
go r.ProcessDownLinkFrame()
|
||||
|
||||
//name := make([]string, 0)
|
||||
|
|
|
@ -65,7 +65,7 @@ func ReportServiceHuaweiProcessGetProperties(r *ReportServiceParamHuaweiTemplate
|
|||
if cmdRX.Status == true {
|
||||
setting.Logger.Debugf("GetRealVariables ok")
|
||||
service := MQTTHuaweiServiceTemplate{}
|
||||
for _, v := range device.CollectInterfaceMap[y].DeviceNodeMap[i].VariableMap {
|
||||
for _, v := range device.CollectInterfaceMap[y].DeviceNodeMap[i].Properties {
|
||||
if v.Name == request.ServiceID {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
|
|
|
@ -206,7 +206,7 @@ func (r *ReportServiceParamHuaweiTemplate) NodePropertyPost(name []string) {
|
|||
for _, d := range c.DeviceNodeMap {
|
||||
if d.Name == n.Name {
|
||||
ServiceMap := make([]MQTTHuaweiServiceTemplate, 0)
|
||||
for _, v := range d.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
service := MQTTHuaweiServiceTemplate{}
|
||||
|
@ -254,7 +254,7 @@ func (r *ReportServiceParamHuaweiTemplate) NodePropertyPost(name []string) {
|
|||
for _, d := range c.DeviceNodeMap {
|
||||
if d.Name == n.Name {
|
||||
ServiceMap := make([]MQTTHuaweiServiceTemplate, 0)
|
||||
for _, v := range d.VariableMap {
|
||||
for _, v := range d.Properties {
|
||||
if len(v.Value) >= 1 {
|
||||
index := len(v.Value) - 1
|
||||
service := MQTTHuaweiServiceTemplate{}
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
{"DeviceNodeType":[{"templateID":0,"templateName":"风机盘管控制器","templateType":"wdt200","templateMessage":""}]}
|
Loading…
Reference in New Issue