修改:1、将commManage中接收放到独立的协程中,可以处理主动上报的数据 2、deviceNode增加自定义命令,customer

This commit is contained in:
pengwang 2020-10-20 15:03:23 +08:00
parent 3d78e4cbd1
commit 174b99d030
3 changed files with 519 additions and 338 deletions

View File

@ -3,51 +3,89 @@ package device
import (
"fmt"
"github.com/sirupsen/logrus"
"goAdapter/setting"
"log"
"strconv"
"time"
"goAdapter/setting"
)
type CommunicationCmdTemplate struct {
CollInterfaceName string //采集接口名称
DeviceAddr string //采集接口下设备地址
FunName string
FunPara interface{}
CollInterfaceName string //采集接口名称
DeviceAddr string //采集接口下设备地址
FunName string
FunPara string
}
type CommunicationManageTemplate struct{
type CommunicationManageTemplate struct {
EmergencyRequestChan chan CommunicationCmdTemplate
CommonRequestChan chan CommunicationCmdTemplate
EmergencyAckChan chan bool
CollInterface *CollectInterfaceTemplate
PacketChan chan []byte
}
var CommunicationManage = make([]*CommunicationManageTemplate, 0)
func NewCommunicationManageTemplate(coll *CollectInterfaceTemplate) *CommunicationManageTemplate {
template := &CommunicationManageTemplate{
EmergencyRequestChan: make(chan CommunicationCmdTemplate, 1),
CommonRequestChan: make(chan CommunicationCmdTemplate, 100),
EmergencyAckChan: make(chan bool, 1),
PacketChan: make(chan []byte, 10),
CollInterface: coll,
}
//启动接收协程
go template.AnalysisRx()
return template
}
func (c *CommunicationManageTemplate)CommunicationManageAddCommon(cmd CommunicationCmdTemplate) {
func (c *CommunicationManageTemplate) CommunicationManageAddCommon(cmd CommunicationCmdTemplate) {
c.CommonRequestChan <- cmd
}
func (c *CommunicationManageTemplate)CommunicationManageAddEmergency(cmd CommunicationCmdTemplate) bool {
func (c *CommunicationManageTemplate) CommunicationManageAddEmergency(cmd CommunicationCmdTemplate) bool {
c.EmergencyRequestChan <- cmd
return <-c.EmergencyAckChan
}
func (c *CommunicationManageTemplate)CommunicationManageDel() {
func (c *CommunicationManageTemplate) AnalysisRx() {
//阻塞读
rxBuf := make([]byte, 256)
rxBufCnt := 0
serialPort := &CommunicationSerialTemplate{}
for k, v := range CommunicationSerialMap {
if v.Name == c.CollInterface.CommInterfaceName {
serialPort = CommunicationSerialMap[k]
}
}
for {
//阻塞读
rxBufCnt = serialPort.ReadData(rxBuf)
if rxBufCnt > 0 {
//log.Printf("rxBufCnt %v\n",rxBufCnt)
//log.Printf("rxBuf %X\n",rxBuf[:rxBufCnt])
//rxTotalBufCnt += rxBufCnt
//追加接收的数据到接收缓冲区
//rxTotalBuf = append(rxTotalBuf, rxBuf[:rxBufCnt]...)
c.PacketChan <- rxBuf[:rxBufCnt]
//清除本次接收数据
rxBufCnt = 0
}
}
}
func (c *CommunicationManageTemplate) CommunicationManageDel() {
for {
select {
@ -55,96 +93,150 @@ func (c *CommunicationManageTemplate)CommunicationManageDel() {
{
log.Println("emergency chan")
log.Printf("funName %s\n", cmd.FunName)
setting.Logger.WithFields(logrus.Fields{
"collName": c.CollInterface.CollInterfaceName,
"deviceAddr": cmd.DeviceAddr,
"funName": cmd.FunName,
}).Info("emergency chan")
status := false
for _, c := range CollectInterfaceMap {
if c.CollInterfaceName == cmd.CollInterfaceName {
for k,v := range c.DeviceNodeMap{
if v.Addr == cmd.DeviceAddr {
log.Printf("index is %d\n", k)
step := 0
for{
//--------------组包---------------------------
txBuf,ok := v.GenerateGetRealVariables(v.Addr,step)
if ok == false{
//log.Printf("getVariables false\n")
goto LoopEmergency
}
step++
log.Printf("tx buf is %X\n", txBuf)
//---------------发送-------------------------
var timeout int
var interval int
for _,v := range CommunicationSerialMap{
if v.Name == c.CommInterfaceName{
v.WriteData(txBuf)
timeout,_ = strconv.Atoi(v.Param.Timeout)
interval,_ = strconv.Atoi(v.Param.Interval)
}
}
v.CommTotalCnt++
//---------------等待接收----------------------
//阻塞读
rxBuf := make([]byte, 256)
rxTotalBuf := make([]byte, 0)
rxBufCnt := 0
rxTotalBufCnt := 0
timerOut := time.NewTimer(time.Duration(timeout) * time.Millisecond)
for {
select {
//是否正确收到数据包
case <-v.AnalysisRx(v.Addr, v.VariableMap, rxTotalBuf, rxTotalBufCnt):
{
log.Println("rx ok")
log.Printf("rxbuf %X\n", rxTotalBuf)
//通信帧延时
time.Sleep(time.Duration(interval)*time.Millisecond)
v.CommSuccessCnt++
v.CurCommFailCnt = 0
v.CommStatus = "onLine"
v.LastCommRTC = time.Now().Format("2006-01-02 15:04:05")
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopEmergencyStep
}
//是否接收超时
case <-timerOut.C:
{
log.Println("rx timeout")
//通信帧延时
time.Sleep(time.Duration(interval)*time.Millisecond)
v.CurCommFailCnt++
if v.CurCommFailCnt >= c.OfflinePeriod{
v.CurCommFailCnt = 0
v.CommStatus = "offLine"
}
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopEmergencyStep
}
//继续接收数据
default:
{
//rxBufCnt,_ = setting.SerialInterface.SerialPort[cmd.InterfaceID].Read(rxBuf)
for _,v := range CommunicationSerialMap{
if v.Name == c.CommInterfaceName{
rxBufCnt = v.ReadData(rxBuf)
}
}
if rxBufCnt > 0 {
rxTotalBufCnt += rxBufCnt
//追加接收的数据到接收缓冲区
rxTotalBuf = append(rxTotalBuf, rxBuf[:rxBufCnt]...)
//清除本地接收数据
rxBufCnt = 0
}
}
}
}
LoopEmergencyStep:
}
LoopEmergency:
for _, v := range c.CollInterface.DeviceNodeMap {
if v.Addr == cmd.DeviceAddr {
log.Printf("%v:addr %v\n", c.CollInterface.CollInterfaceName, v.Addr)
step := 0
for {
//--------------组包---------------------------
txBuf := make([]byte, 0)
con := false
ok := false
txBuf, ok, con = v.DeviceCustomCmd(cmd.DeviceAddr, cmd.FunName, cmd.FunPara, step)
if ok == false {
log.Printf("DeviceCustomCmd false\n")
goto LoopEmergency
}
step++
log.Printf("%v:txbuf %X\n", c.CollInterface.CollInterfaceName, txBuf)
CommunicationMessage := CommunicationMessageTemplate{
CollName: c.CollInterface.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05"),
Direction: "send",
Content: fmt.Sprintf("%X", txBuf),
}
if len(c.CollInterface.CommMessage) < 1024 {
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
} else {
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
}
//---------------发送-------------------------
var timeout int
var interval int
//判断是否是串口采集
for _, v := range CommunicationSerialMap {
if v.Name == c.CollInterface.CommInterfaceName {
v.WriteData(txBuf)
timeout, _ = strconv.Atoi(v.Param.Timeout)
interval, _ = strconv.Atoi(v.Param.Interval)
}
}
v.CommTotalCnt++
//---------------等待接收----------------------
//阻塞读
rxBuf := make([]byte, 256)
rxTotalBuf := make([]byte, 0)
rxBufCnt := 0
rxTotalBufCnt := 0
timerOut := time.NewTimer(time.Duration(timeout) * time.Millisecond)
if (cmd.FunName == "EraseFlash") || (cmd.FunName == "SendFilePkt") {
timerOut.Reset(time.Duration(5000) * time.Millisecond)
} else {
timerOut.Reset(time.Duration(timeout) * time.Millisecond)
}
for {
select {
//是否正确收到数据包
case <-v.AnalysisRx(v.Addr, v.VariableMap, rxTotalBuf, rxTotalBufCnt):
{
log.Printf("%v:rx ok\n", c.CollInterface.CollInterfaceName)
log.Printf("%v:rxbuf %X\n", c.CollInterface.CollInterfaceName, rxTotalBuf)
CommunicationMessage := CommunicationMessageTemplate{
CollName: c.CollInterface.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05"),
Direction: "receive",
Content: fmt.Sprintf("%X", rxTotalBuf),
}
if len(c.CollInterface.CommMessage) < 1024 {
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
} else {
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
}
status = true
if con == true {
//通信帧延时
time.Sleep(time.Duration(interval) * time.Millisecond)
}
v.CommSuccessCnt++
v.CurCommFailCnt = 0
v.CommStatus = "onLine"
v.LastCommRTC = time.Now().Format("2006-01-02 15:04:05")
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopEmergencyStep
}
//是否接收超时
case <-timerOut.C:
{
CommunicationMessage := CommunicationMessageTemplate{
CollName: c.CollInterface.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05"),
Direction: "receive",
Content: fmt.Sprintf("%X", rxTotalBuf),
}
if len(c.CollInterface.CommMessage) < 1024 {
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
} else {
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
}
log.Printf("%v,rx timeout\n", c.CollInterface.CollInterfaceName)
log.Printf("%v:rxbuf %X\n", c.CollInterface.CollInterfaceName, rxTotalBuf)
//通信帧延时
time.Sleep(time.Duration(interval) * time.Millisecond)
v.CurCommFailCnt++
if v.CurCommFailCnt >= c.CollInterface.OfflinePeriod {
v.CurCommFailCnt = 0
v.CommStatus = "offLine"
}
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopEmergencyStep
}
//继续接收数据
case rxBuf = <-c.PacketChan:
{
rxBufCnt = len(rxBuf)
if rxBufCnt > 0 {
rxTotalBufCnt += rxBufCnt
//追加接收的数据到接收缓冲区
rxTotalBuf = append(rxTotalBuf, rxBuf[:rxBufCnt]...)
//清除本次接收数据
rxBufCnt = 0
rxBuf = rxBuf[0:0]
//log.Printf("rxTotalBuf %X\n", rxTotalBuf)
}
}
}
}
LoopEmergencyStep:
}
LoopEmergency:
}
}
c.EmergencyAckChan <- status
@ -154,178 +246,179 @@ func (c *CommunicationManageTemplate)CommunicationManageDel() {
select {
case cmd := <-c.CommonRequestChan:
{
//setting.Logrus.Printf("%v:common chan\n",c.CollInterfaceName)
setting.Logger.WithFields(logrus.Fields{
"collName": c.CollInterface.CollInterfaceName,
}).Info("common chan")
for _, coll := range CollectInterfaceMap {
if coll.CollInterfaceName == cmd.CollInterfaceName {
for _,v := range coll.DeviceNodeMap{
if v.Addr == cmd.DeviceAddr {
log.Printf("%v:addr %v\n", coll.CollInterfaceName,v.Addr)
step := 0
for{
//--------------组包---------------------------
txBuf,ok := v.GenerateGetRealVariables(v.Addr,step)
if ok == false{
log.Printf("%v:getVariables finish\n",coll.CollInterfaceName)
goto LoopCommon
}
step++
log.Printf("%v:txbuf %X\n", coll.CollInterfaceName,txBuf)
CommunicationMessage := CommunicationMessageTemplate{
CollName: coll.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05.999"),
Direction: "send",
Content: fmt.Sprintf("%X",txBuf),
}
if len(c.CollInterface.CommMessage) < 1024{
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage,CommunicationMessage)
}else{
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage,CommunicationMessage)
}
//---------------发送-------------------------
var timeout int
var interval int
//判断是否是串口采集
for _,v := range CommunicationSerialMap{
if v.Name == coll.CommInterfaceName{
v.WriteData(txBuf)
timeout,_ = strconv.Atoi(v.Param.Timeout)
interval,_ = strconv.Atoi(v.Param.Interval)
}
}
v.CommTotalCnt++
//---------------等待接收----------------------
//阻塞读
rxBuf := make([]byte, 256)
rxTotalBuf := make([]byte, 0)
rxBufCnt := 0
rxTotalBufCnt := 0
timerOut := time.NewTimer(time.Duration(timeout) * time.Millisecond)
for {
select {
//是否正确收到数据包
case <-v.AnalysisRx(v.Addr, v.VariableMap, rxTotalBuf, rxTotalBufCnt):
{
log.Printf("%v:rx ok\n",coll.CollInterfaceName)
log.Printf("%v:rxbuf %X\n", coll.CollInterfaceName,rxTotalBuf)
CommunicationMessage := CommunicationMessageTemplate{
CollName: coll.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05.999"),
Direction: "receive",
Content: fmt.Sprintf("%X",rxTotalBuf),
}
if len(c.CollInterface.CommMessage) < 1024{
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage,CommunicationMessage)
}else{
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage,CommunicationMessage)
}
//通信帧延时
time.Sleep(time.Duration(interval)*time.Millisecond)
v.CommSuccessCnt++
v.CurCommFailCnt = 0
v.CommStatus = "onLine"
v.LastCommRTC = time.Now().Format("2006-01-02 15:04:05.999")
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopCommonStep
}
//是否接收超时
case <-timerOut.C:
{
CommunicationMessage := CommunicationMessageTemplate{
CollName: coll.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05.999"),
Direction: "receive",
Content: fmt.Sprintf("%X",rxTotalBuf),
}
if len(c.CollInterface.CommMessage) < 1024{
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage,CommunicationMessage)
}else{
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage,CommunicationMessage)
}
log.Printf("%v,rx timeout\n",coll.CollInterfaceName)
//通信帧延时
time.Sleep(time.Duration(interval)*time.Millisecond)
v.CurCommFailCnt++
if v.CurCommFailCnt >= coll.OfflinePeriod{
v.CurCommFailCnt = 0
v.CommStatus = "offLine"
}
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopCommonStep
}
//继续接收数据
default:
{
for _,v := range CommunicationSerialMap{
if v.Name == coll.CommInterfaceName{
rxBufCnt = v.ReadData(rxBuf)
}
}
if rxBufCnt > 0 {
rxTotalBufCnt += rxBufCnt
//追加接收的数据到接收缓冲区
rxTotalBuf = append(rxTotalBuf, rxBuf[:rxBufCnt]...)
//清除本次接收数据
rxBufCnt = 0
//log.Printf("rxbuf %X\n",rxTotalBuf)
}
}
}
}
LoopCommonStep:
//for _, coll := range CollectInterfaceMap {
// if coll.CollInterfaceName == cmd.CollInterfaceName {
for _, v := range c.CollInterface.DeviceNodeMap {
if v.Addr == cmd.DeviceAddr {
log.Printf("%v:addr %v\n", c.CollInterface.CollInterfaceName, v.Addr)
step := 0
for {
//--------------组包---------------------------
if cmd.FunName == "GetDeviceRealVariables" {
txBuf, ok, con := v.GenerateGetRealVariables(v.Addr, step)
if ok == false {
log.Printf("%v:getVariables finish\n", c.CollInterface.CollInterfaceName)
goto LoopCommon
}
step++
log.Printf("%v:txbuf %X\n", c.CollInterface.CollInterfaceName, txBuf)
CommunicationMessage := CommunicationMessageTemplate{
CollName: c.CollInterface.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05"),
Direction: "send",
Content: fmt.Sprintf("%X", txBuf),
}
if len(c.CollInterface.CommMessage) < 1024 {
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
} else {
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
}
//---------------发送-------------------------
var timeout int
var interval int
//判断是否是串口采集
for _, v := range CommunicationSerialMap {
if v.Name == c.CollInterface.CommInterfaceName {
v.WriteData(txBuf)
timeout, _ = strconv.Atoi(v.Param.Timeout)
interval, _ = strconv.Atoi(v.Param.Interval)
}
}
v.CommTotalCnt++
//---------------等待接收----------------------
//阻塞读
rxBuf := make([]byte, 256)
rxTotalBuf := make([]byte, 0)
rxBufCnt := 0
rxTotalBufCnt := 0
timerOut := time.NewTimer(time.Duration(timeout) * time.Millisecond)
for {
select {
//是否正确收到数据包
case <-v.AnalysisRx(v.Addr, v.VariableMap, rxTotalBuf, rxTotalBufCnt):
{
log.Printf("%v:rx ok\n", c.CollInterface.CollInterfaceName)
log.Printf("%v:rxbuf %X\n", c.CollInterface.CollInterfaceName, rxTotalBuf)
CommunicationMessage := CommunicationMessageTemplate{
CollName: c.CollInterface.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05"),
Direction: "receive",
Content: fmt.Sprintf("%X", rxTotalBuf),
}
if len(c.CollInterface.CommMessage) < 1024 {
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
} else {
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
}
if con == true {
//通信帧延时
time.Sleep(time.Duration(interval) * time.Millisecond)
}
v.CommSuccessCnt++
v.CurCommFailCnt = 0
v.CommStatus = "onLine"
v.LastCommRTC = time.Now().Format("2006-01-02 15:04:05")
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopCommonStep
}
//是否接收超时
case <-timerOut.C:
{
CommunicationMessage := CommunicationMessageTemplate{
CollName: c.CollInterface.CollInterfaceName,
TimeStamp: time.Now().Format("2006-01-02 15:04:05"),
Direction: "receive",
Content: fmt.Sprintf("%X", rxTotalBuf),
}
if len(c.CollInterface.CommMessage) < 1024 {
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
} else {
c.CollInterface.CommMessage = c.CollInterface.CommMessage[1:]
c.CollInterface.CommMessage = append(c.CollInterface.CommMessage, CommunicationMessage)
}
log.Printf("%v,rx timeout\n", c.CollInterface.CollInterfaceName)
log.Printf("%v:rxbuf %X\n", c.CollInterface.CollInterfaceName, rxTotalBuf)
//通信帧延时
time.Sleep(time.Duration(interval) * time.Millisecond)
v.CurCommFailCnt++
if v.CurCommFailCnt >= c.CollInterface.OfflinePeriod {
v.CurCommFailCnt = 0
v.CommStatus = "offLine"
}
rxTotalBufCnt = 0
rxTotalBuf = rxTotalBuf[0:0]
goto LoopCommonStep
}
//继续接收数据
case rxBuf = <-c.PacketChan:
{
rxBufCnt = len(rxBuf)
if rxBufCnt > 0 {
rxTotalBufCnt += rxBufCnt
//追加接收的数据到接收缓冲区
rxTotalBuf = append(rxTotalBuf, rxBuf[:rxBufCnt]...)
//清除本次接收数据
rxBufCnt = 0
rxBuf = rxBuf[0:0]
//log.Printf("rxTotalBuf %X\n", rxTotalBuf)
}
}
}
}
LoopCommon:
}
}
coll.DeviceNodeOnlineCnt = 0
for _,v := range coll.DeviceNodeMap{
if v.CommStatus == "onLine"{
coll.DeviceNodeOnlineCnt++
}
LoopCommonStep:
}
LoopCommon:
}
}
c.CollInterface.DeviceNodeOnlineCnt = 0
for _, v := range c.CollInterface.DeviceNodeMap {
if v.CommStatus == "onLine" {
c.CollInterface.DeviceNodeOnlineCnt++
}
}
//}
//}
//更新设备在线率
deviceTotalCnt := 0
deviceOnlineCnt := 0
for _,v := range CollectInterfaceMap{
for _, v := range CollectInterfaceMap {
deviceTotalCnt += v.DeviceNodeCnt
deviceOnlineCnt += v.DeviceNodeOnlineCnt
}
if deviceOnlineCnt == 0{
if deviceOnlineCnt == 0 {
setting.SystemState.DeviceOnline = "0"
}else{
setting.SystemState.DeviceOnline = fmt.Sprintf("%2.1f",float32(deviceOnlineCnt*100.0/deviceTotalCnt))
} else {
setting.SystemState.DeviceOnline = fmt.Sprintf("%2.1f", float32(deviceOnlineCnt*100.0/deviceTotalCnt))
}
//更新设备丢包率
deviceCommTotalCnt := 0
deviceCommLossCnt := 0
for _,v := range CollectInterfaceMap{
for _,v := range v.DeviceNodeMap{
for _, v := range CollectInterfaceMap {
for _, v := range v.DeviceNodeMap {
deviceCommTotalCnt += v.CommTotalCnt
deviceCommLossCnt += v.CommTotalCnt-v.CommSuccessCnt
deviceCommLossCnt += v.CommTotalCnt - v.CommSuccessCnt
}
}
if deviceCommLossCnt == 0{
if deviceCommLossCnt == 0 {
setting.SystemState.DevicePacketLoss = "0"
}else{
setting.SystemState.DevicePacketLoss = fmt.Sprintf("%2.1f",float32(deviceCommLossCnt*100.0/deviceCommTotalCnt))
} else {
setting.SystemState.DevicePacketLoss = fmt.Sprintf("%2.1f", float32(deviceCommLossCnt*100.0/deviceCommTotalCnt))
}
}
default:
@ -336,14 +429,14 @@ func (c *CommunicationManageTemplate)CommunicationManageDel() {
}
}
func (c *CommunicationManageTemplate)CommunicationManagePoll() {
func (c *CommunicationManageTemplate) CommunicationManagePoll() {
cmd := CommunicationCmdTemplate{}
//对采集接口进行遍历
for _, coll := range CollectInterfaceMap {
if coll.CollInterfaceName == c.CollInterface.CollInterfaceName {
//对采集接口下设备进行遍历
for _,v := range coll.DeviceNodeMap{
for _, v := range coll.DeviceNodeMap {
cmd.CollInterfaceName = coll.CollInterfaceName
cmd.DeviceAddr = v.Addr
cmd.FunName = "GetDeviceRealVariables"

View File

@ -3,66 +3,66 @@ package device
import (
"github.com/yuin/gluamapper"
lua "github.com/yuin/gopher-lua"
"goAdapter/setting"
"layeh.com/gopher-luar"
"time"
"goAdapter/setting"
)
var MaxDeviceNodeCnt int = 50
type ValueTemplate struct{
Value interface{} //变量值,不可以是字符串
Explain interface{} //变量值解释,必须是字符串
TimeStamp string
type ValueTemplate struct {
Value interface{} //变量值,不可以是字符串
Explain interface{} //变量值解释,必须是字符串
TimeStamp string
}
//变量标签模版
type VariableTemplate struct{
Index int `json:"index"` //变量偏移量
Name string `json:"name"` //变量名
Label string `json:"lable"` //变量标签
Value []ValueTemplate `json:"value"` //变量值
Type string `json:"type"` //变量类型
type VariableTemplate struct {
Index int `json:"index"` //变量偏移量
Name string `json:"name"` //变量名
Label string `json:"lable"` //变量标签
Value []ValueTemplate `json:"value"` //变量值
Type string `json:"type"` //变量类型
}
//设备模板
type DeviceNodeTemplate struct {
Index int `json:"Index"` //设备偏移量
Name string `json:"Name"` //设备名称
Addr string `json:"Addr"` //设备地址
Type string `json:"Type"` //设备类型
LastCommRTC string `json:"LastCommRTC"` //最后一次通信时间戳
CommTotalCnt int `json:"CommTotalCnt"` //通信总次数
CommSuccessCnt int `json:"CommSuccessCnt"` //通信成功次数
CurCommFailCnt int `json:"-"` //当前通信失败次数
CommStatus string `json:"CommStatus"` //通信状态
VariableMap []VariableTemplate `json:"-"` //变量列表
Index int `json:"Index"` //设备偏移量
Name string `json:"Name"` //设备名称
Addr string `json:"Addr"` //设备地址
Type string `json:"Type"` //设备类型
LastCommRTC string `json:"LastCommRTC"` //最后一次通信时间戳
CommTotalCnt int `json:"CommTotalCnt"` //通信总次数
CommSuccessCnt int `json:"CommSuccessCnt"` //通信成功次数
CurCommFailCnt int `json:"-"` //当前通信失败次数
CommStatus string `json:"CommStatus"` //通信状态
VariableMap []VariableTemplate `json:"-"` //变量列表
}
func (d *DeviceNodeTemplate) NewVariables() []VariableTemplate {
type LuaVariableTemplate struct{
type LuaVariableTemplate struct {
Index int
Name string
Name string
Label string
Type string
Type string
}
type LuaVariableMapTemplate struct{
type LuaVariableMapTemplate struct {
Variable []*LuaVariableTemplate
}
for k,v := range DeviceNodeTypeMap.DeviceNodeType{
if d.Type == v.TemplateType{
for k, v := range DeviceNodeTypeMap.DeviceNodeType {
if d.Type == v.TemplateType {
//调用NewVariables
err := DeviceTypePluginMap[k].CallByParam(lua.P{
Fn:DeviceTypePluginMap[k].GetGlobal("NewVariables"),
NRet:1,
Fn: DeviceTypePluginMap[k].GetGlobal("NewVariables"),
NRet: 1,
Protect: true,
})
if err != nil{
setting.Logger.Warning("NewVariables err,",err)
if err != nil {
setting.Logger.Warning("NewVariables err,", err)
}
//获取返回结果
@ -72,20 +72,20 @@ func (d *DeviceNodeTemplate) NewVariables() []VariableTemplate {
LuaVariableMap := LuaVariableMapTemplate{}
if err := gluamapper.Map(ret.(*lua.LTable), &LuaVariableMap); err != nil {
setting.Logger.Warning("NewVariables gluamapper.Map err,",err)
setting.Logger.Warning("NewVariables gluamapper.Map err,", err)
}
variables := make([]VariableTemplate,0)
variables := make([]VariableTemplate, 0)
for _,v := range LuaVariableMap.Variable{
for _, v := range LuaVariableMap.Variable {
variable := VariableTemplate{}
variable.Index = v.Index
variable.Name = v.Name
variable.Label = v.Label
variable.Type = v.Type
variable.Value = make([]ValueTemplate,0)
variables = append(variables,variable)
variable.Value = make([]ValueTemplate, 0)
variables = append(variables, variable)
}
return variables
}
@ -101,23 +101,24 @@ func (d *DeviceNodeTemplate) NewVariables() []VariableTemplate {
return nil
}
func (d *DeviceNodeTemplate) GenerateGetRealVariables(sAddr string,step int) ([]byte,bool) {
func (d *DeviceNodeTemplate) GenerateGetRealVariables(sAddr string, step int) ([]byte, bool, bool) {
type LuaVariableMapTemplate struct{
type LuaVariableMapTemplate struct {
Status string `json:"Status"`
Variable []*byte
}
for k,v := range DeviceNodeTypeMap.DeviceNodeType {
for k, v := range DeviceNodeTypeMap.DeviceNodeType {
if d.Type == v.TemplateType {
//调用NewVariables
err := DeviceTypePluginMap[k].CallByParam(lua.P{
Fn:DeviceTypePluginMap[k].GetGlobal("GenerateGetRealVariables"),
NRet:1,
Fn: DeviceTypePluginMap[k].GetGlobal("GenerateGetRealVariables"),
NRet: 1,
Protect: true,
},lua.LString(sAddr),lua.LNumber(step))
if err != nil{
setting.Logger.Warning("GenerateGetRealVariables err,",err)
}, lua.LString(sAddr), lua.LNumber(step))
if err != nil {
setting.Logger.Warning("GenerateGetRealVariables err,", err)
}
//获取返回结果
@ -126,22 +127,27 @@ func (d *DeviceNodeTemplate) GenerateGetRealVariables(sAddr string,step int) ([]
LuaVariableMap := LuaVariableMapTemplate{}
if err := gluamapper.Map(ret.(*lua.LTable), &LuaVariableMap); err != nil {
setting.Logger.Warning("GenerateGetRealVariables gluamapper.Map err,",err)
setting.Logger.Warning("GenerateGetRealVariables gluamapper.Map err,", err)
}
ok := false
nBytes := make([]byte,0)
if len(LuaVariableMap.Variable) > 0{
con := false //后续是否有报文
if LuaVariableMap.Status == "0" {
con = false
} else {
con = true
}
nBytes := make([]byte, 0)
if len(LuaVariableMap.Variable) > 0 {
ok = true
for _,v := range LuaVariableMap.Variable{
nBytes = append(nBytes,*v)
for _, v := range LuaVariableMap.Variable {
nBytes = append(nBytes, *v)
}
}else{
} else {
ok = false
}
return nBytes,ok
return nBytes, ok, con
}
}
@ -152,14 +158,92 @@ func (d *DeviceNodeTemplate) GenerateGetRealVariables(sAddr string,step int) ([]
// return nBytes,ok
// }
//}
return nil,false
return nil, false, false
}
func (d *DeviceNodeTemplate) AnalysisRx(sAddr string,variables []VariableTemplate,rxBuf []byte,rxBufCnt int) chan bool{
func (d *DeviceNodeTemplate) DeviceCustomCmd(sAddr string, cmdName string, cmdParam string, step int) ([]byte, bool, bool) {
status := make(chan bool,1)
type LuaVariableMapTemplate struct {
Status string `json:"Status"`
Variable []*byte `json:"Variable"`
}
type LuaVariableTemplate struct{
//log.Printf("cmdParam %+v\n", cmdParam)
for k, v := range DeviceNodeTypeMap.DeviceNodeType {
if d.Type == v.TemplateType {
var err error
var ret lua.LValue
////调用json
//err = DeviceTypePluginMap[k].CallByParam(lua.P{
// Fn: DeviceTypePluginMap[k].GetGlobal("encode"),
// NRet: 1,
// Protect: true,
//}, lua.LString(cmdParam))
//if err != nil {
// setting.Logger.Warning("json.encode err,", err)
// return nil, false, false
//}
//
////获取返回结果
//ret = DeviceTypePluginMap[k].Get(-1)
//DeviceTypePluginMap[k].Pop(1)
//// 打印结果
////res, _ := ret.(*lua.LTable)
//调用DeviceCustomCmd
err = DeviceTypePluginMap[k].CallByParam(lua.P{
Fn: DeviceTypePluginMap[k].GetGlobal("DeviceCustomCmd"),
NRet: 1,
Protect: true,
}, lua.LString(sAddr),
lua.LString(cmdName),
lua.LString(cmdParam),
lua.LNumber(step))
if err != nil {
setting.Logger.Warning("DeviceCustomCmd err,", err)
return nil, false, false
}
//获取返回结果
ret = DeviceTypePluginMap[k].Get(-1)
DeviceTypePluginMap[k].Pop(1)
LuaVariableMap := LuaVariableMapTemplate{}
if err := gluamapper.Map(ret.(*lua.LTable), &LuaVariableMap); err != nil {
setting.Logger.Warning("DeviceCustomCmd gluamapper.Map err,", err)
return nil, false, false
}
ok := false
con := false //后续是否有报文
if LuaVariableMap.Status == "0" {
con = false
} else {
con = true
}
nBytes := make([]byte, 0)
if len(LuaVariableMap.Variable) > 0 {
ok = true
for _, v := range LuaVariableMap.Variable {
nBytes = append(nBytes, *v)
}
} else {
ok = false
}
return nBytes, ok, con
}
}
return nil, false, false
}
func (d *DeviceNodeTemplate) AnalysisRx(sAddr string, variables []VariableTemplate, rxBuf []byte, rxBufCnt int) chan bool {
status := make(chan bool, 1)
type LuaVariableTemplate struct {
Index int
Name string
Label string
@ -168,28 +252,29 @@ func (d *DeviceNodeTemplate) AnalysisRx(sAddr string,variables []VariableTemplat
Explain string
}
type LuaVariableMapTemplate struct{
type LuaVariableMapTemplate struct {
Status string `json:"Status"`
Variable []*LuaVariableTemplate
}
for k,v := range DeviceNodeTypeMap.DeviceNodeType{
if d.Type == v.TemplateType{
for k, v := range DeviceNodeTypeMap.DeviceNodeType {
if d.Type == v.TemplateType {
tbl := lua.LTable{}
for _,v := range rxBuf{
for _, v := range rxBuf {
tbl.Append(lua.LNumber(v))
}
DeviceTypePluginMap[k].SetGlobal("rxBuf", luar.New(DeviceTypePluginMap[k], &tbl))
//调用NewVariables
//AnalysisRx
err := DeviceTypePluginMap[k].CallByParam(lua.P{
Fn:DeviceTypePluginMap[k].GetGlobal("AnalysisRx"),
NRet:1,
Fn: DeviceTypePluginMap[k].GetGlobal("AnalysisRx"),
NRet: 1,
Protect: true,
},lua.LString(sAddr),lua.LNumber(rxBufCnt))
if err != nil{
setting.Logger.Warning("AnalysisRx err,",err)
}, lua.LString(sAddr), lua.LNumber(rxBufCnt))
if err != nil {
setting.Logger.Warning("AnalysisRx err,", err)
}
//获取返回结果
@ -199,40 +284,39 @@ func (d *DeviceNodeTemplate) AnalysisRx(sAddr string,variables []VariableTemplat
LuaVariableMap := LuaVariableMapTemplate{}
if err := gluamapper.Map(ret.(*lua.LTable), &LuaVariableMap); err != nil {
setting.Logger.Warning("AnalysisRx gluamapper.Map err,",err)
setting.Logger.Warning("AnalysisRx gluamapper.Map err,", err)
}
timeNowStr := time.Now().Format("2006-01-02 15:04:05")
value := ValueTemplate{}
if len(LuaVariableMap.Variable) > 0{
for _,lv := range LuaVariableMap.Variable{
for k,v := range variables{
if lv.Index == v.Index{
//log.Printf("LuaVariableMap %+v\n", LuaVariableMap)
if LuaVariableMap.Status == "0" {
if len(LuaVariableMap.Variable) > 0 {
for _, lv := range LuaVariableMap.Variable {
for k, v := range variables {
if lv.Index == v.Index {
variables[k].Index = lv.Index
variables[k].Name = lv.Name
variables[k].Label = lv.Label
variables[k].Type = lv.Type
variables[k].Index = lv.Index
variables[k].Name = lv.Name
variables[k].Label = lv.Label
variables[k].Type = lv.Type
value.Value = lv.Value
value.Explain = lv.Explain
value.TimeStamp = timeNowStr
value.Value = lv.Value
value.Explain = lv.Explain
value.TimeStamp = timeNowStr
variables[k].Value = append(variables[k].Value,value)
if len(variables[k].Value) < 100 {
variables[k].Value = append(variables[k].Value, value)
} else {
variables[k].Value = variables[k].Value[1:]
variables[k].Value = append(variables[k].Value, value)
}
}
}
}
}
status <-true
status <- true
}
}
}
//for k,v := range DeviceNodeTypeMap.DeviceNodeType {
// if d.Type == v.TemplateType {
// analysisRxFun, _ := DeviceTypePluginMap[k].Lookup("AnalysisRx")
// status = analysisRxFun.(func(string,[]VariableTemplate,[]byte, int) chan bool)(sAddr, variables, rxBuf, rxBufCnt)
// return status
// }
//}
return status
}
}

View File

@ -11,7 +11,11 @@ import (
var Logger = logrus.New()
func LogerInit(level string,save bool,cnt uint){
func init() {
LogerInit(LogLevel, LogSaveToFile, LogFileMaxCnt)
}
func LogerInit(level string, save bool, cnt uint) {
//log输出行号和ms
//log.SetFlags(log.Lshortfile | log.Ldate | log.Lmicroseconds)
@ -20,7 +24,7 @@ func LogerInit(level string,save bool,cnt uint){
Logger.Formatter = &logrus.JSONFormatter{}
//fmt.Printf("level %v\n",level)
//fmt.Printf("save %v\n",save)
if save == true{
if save == true {
exeCurDir, _ := filepath.Abs(filepath.Dir(os.Args[0]))
@ -39,13 +43,13 @@ func LogerInit(level string,save bool,cnt uint){
rotatelogs.WithRotationCount(cnt),
rotatelogs.WithRotationTime(time.Hour),
)
if err != nil{
if err != nil {
fmt.Println(err)
}
//Loger.SetOutput(writer)
Logger.Out = writer
}else{
} else {
// 设置将日志输出到标准输出默认的输出为stderr标准错误
// 日志消息输出可以是任意的io.writer类型
Logger.SetOutput(os.Stdout)