修改:1、实现了corn定时 2、优化系统状态获取,定时获取设备在线率、丢包率等

This commit is contained in:
pengwang 2020-08-18 14:54:06 +08:00
parent a8fed1e42d
commit 7b888a3ad2
5 changed files with 187 additions and 216 deletions

View File

@ -60,10 +60,11 @@ func main() {
for _,v := range device.CollectInterfaceMap{
CommunicationManage := device.NewCommunicationManageTemplate()
CommunicationManage.CollInterfaceName = v.CollInterfaceName
str := fmt.Sprintf("/%d */%d * * * ?",v.PollPeriod%60,v.PollPeriod/60)
str := fmt.Sprintf("@every %dm%ds",v.PollPeriod/60,v.PollPeriod%60)
log.Printf("str %+v\n",str)
cronGetNetStatus.AddFunc("*/60 * * * * *", CommunicationManage.CommunicationManagePoll)
//cronGetNetStatus.AddFunc("10 */1 * * * *", CommunicationManage.CommunicationManagePoll)
cronGetNetStatus.AddFunc(str, CommunicationManage.CommunicationManagePoll)
go CommunicationManage.CommunicationManageDel()
}
@ -78,7 +79,7 @@ func main() {
cronGetNetStatus.Start()
defer cronGetNetStatus.Stop()
mqttClient.MqttAppConnect()
mqttClient.MQTTClient_Init()
/**************httpserver初始化****************/
// 默认启动方式,包含 Logger、Recovery 中间件

View File

@ -37,7 +37,7 @@ func Calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp st
var MQTTClientId bytes.Buffer
MQTTClientId.WriteString(clientId)
// hmac, use sha1; securemode=2 means TLS connection
MQTTClientId.WriteString("|securemode=3,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
MQTTClientId.WriteString("|securemode=3,signmethod=hmacsha1,timestamp=")
MQTTClientId.WriteString(timeStamp)
MQTTClientId.WriteString("|")

145
mqttClient/mqttAliyun.go Normal file
View File

@ -0,0 +1,145 @@
package mqttClient
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
MQTT "github.com/eclipse/paho.mqtt.golang"
"io/ioutil"
"log"
"os"
"time"
)
var (
productKey string = "a1oSllgBCjt"
deviceName string = "1111"
deviceSecret string = "2d7d200249a49568cfbdace0900e6dcd"
clientId string = "1111"
timeStamp string = "1528018257135"
)
var (
subTopic string = "/" + productKey + "/" + deviceName + "/user/get"
pubTopic string = "/" + productKey + "/" + deviceName + "/user/update"
registerTopic string = "/" + "ext/session/" + "a1Hhs4E2xUG" + "/" + "2001" + "/combine/login"
)
// define a function for the default message handler
var publishHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func MQTTClient_Init() {
// set the login broker url
var raw_broker bytes.Buffer
raw_broker.WriteString("tls://")
raw_broker.WriteString(productKey)
raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")
opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
// calculate the login auth info, and set it into the connection options
auth := Calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
opts.SetClientID(auth.mqttClientId)
opts.SetUsername(auth.username)
opts.SetPassword(auth.password)
opts.SetKeepAlive(60 * 2 * time.Second)
opts.SetDefaultPublishHandler(publishHandler)
// set the tls configuration
//tlsconfig := NewTLSConfig()
//opts.SetTLSConfig(tlsconfig)
// create and start a client using the above ClientOptions
c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
log.Printf("Connect aliyun IoT Cloud Sucess\n")
// subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered
if token := c.Subscribe(subTopic, 0, nil); token.Wait() && token.Error() != nil {
log.Println(token.Error())
os.Exit(1)
}
log.Printf("Subscribe topic " + subTopic + " success\n")
//// publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update")
//for i := 0; i < 5; i++ {
// log.Println("publish msg:", i)
// text := fmt.Sprintf("ABC #%d", i)
// token := c.Publish(pubTopic, 0, false, text)
// log.Println("publish msg: ", text)
// token.Wait()
// time.Sleep(2 * time.Second)
//}
authDevice := Calculate_sign("2001", "a1Hhs4E2xUG", "2001", "470c8262d6cee4bf440cf66758c2f3b8", timeStamp)
log.Printf("authDevice %v\n",authDevice.password)
registerText := `{
"id": "123",
"params": {
"productKey": "a1Hhs4E2xUG",
"deviceName": "2001",
"clientId": "2001",
"timestamp": "1528018257135",
"signMethod": "hmacsha1",
"sign": "edabfd45469dec12cf3432dd6083c58dadffb4ca",
"cleanSession": "true"
}
}`
token := c.Publish(registerTopic, 0, false, registerText)
log.Println("publish msg: ", registerText)
token.Wait()
/*
// unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get")
if token := c.Unsubscribe(subTopic);token.Wait() && token.Error() != nil {
log.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
*/
}
func NewTLSConfig() *tls.Config {
// Import trusted certificates from CAfile.pem.
// Alternatively, manually add CA certificates to default openssl CA bundle.
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile("./x509/root.pem")
if err != nil {
fmt.Println("0. read file error, game over!!")
}
certpool.AppendCertsFromPEM(pemCerts)
// Create tls.Config with desired tls properties
return &tls.Config{
// RootCAs = certs used to verify server cert.
RootCAs: certpool,
// ClientAuth = whether to request cert from server.
// Since the server is set up for SSL, this happens
// anyways.
ClientAuth: tls.NoClientCert,
// ClientCAs = certs used to validate client cert.
ClientCAs: nil,
// InsecureSkipVerify = verify that cert contents
// match server. IP matches what is in cert etc.
InsecureSkipVerify: false,
// Certificates = list of certs client sends to server.
// Certificates: []tls.Certificate{cert},
}
}

View File

@ -1,210 +0,0 @@
package mqttClient
import (
"fmt"
"github.com/eclipse/paho.mqtt.golang"
aiot "github.com/thinkgos/aliyun-iot"
"github.com/thinkgos/aliyun-iot/dm"
"github.com/thinkgos/aliyun-iot/infra"
"github.com/thinkgos/aliyun-iot/sign"
"log"
"math/rand"
)
const (
productKey = "a1oSllgBCjt"
productSecret = ""
deviceName = "1111"
deviceSecret = "2d7d200249a49568cfbdace0900e6dcd"
)
var dmClient *aiot.MQTTClient
type Properties struct{
Value interface{} `json:value`
}
type ServiceContent struct{
ServiceID string `json:"service_id"`
Properties Properties `json:"properties"`
}
type PublishContent struct{
Service []ServiceContent `json:"services"`
}
type AliYunMqttClientTemplate struct{
ClientId string `json:"ClientID"`
DeviceName string `json:"DeviceName"`
ProductKey string `json:"ProductKey"`
DeviceSecret string `json:"DeviceSecret"`
TimeStamp string `json:"TimeStamp"`
}
var mClient mqtt.Client
var AliYunMqttClient = AliYunMqttClientTemplate{
ClientId : "1111|securemode=3,signmethod=hmacsha1|",
DeviceName : "1111",
ProductKey : "a1oSllgBCjt",
DeviceSecret : "2d7d200249a49568cfbdace0900e6dcd",
TimeStamp : "1528018257135",
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
log.Printf("Pub Client Topic : %s \n", msg.Topic())
log.Printf("Pub Client msg : %s \n", msg.Payload())
}
//创建全局mqtt sub消息处理 handler
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
log.Printf("Sub Client Topic : %s \n", msg.Topic())
log.Printf("Sub Client msg : %s \n", msg.Payload())
}
func MqttAppConnect() {
//设备三元信息
metaInfo := &infra.MetaInfo{
ProductKey: productKey,
ProductSecret: productSecret,
DeviceName: deviceName,
DeviceSecret: deviceSecret,
}
//设备域名信息
cloudRegionDomain := infra.CloudRegionDomain{
Region: infra.CloudRegionShangHai,
}
mqttSign := sign.NewMQTTSign()
mqttSign.SetSDKVersion(infra.IOTSDKVersion)
mqttSignInfo,err := mqttSign.Generate(metaInfo,cloudRegionDomain)
if err != nil {
panic(err)
}
clientOpts := mqtt.NewClientOptions()
clientOpts = clientOpts.AddBroker(fmt.Sprintf("%s:%d", mqttSignInfo.HostName, mqttSignInfo.Port))
clientOpts.SetClientID(mqttSignInfo.ClientID)
clientOpts.SetUsername(mqttSignInfo.UserName)
clientOpts.SetPassword(mqttSignInfo.Password)
clientOpts.SetCleanSession(true)
clientOpts.SetAutoReconnect(true)
clientOpts.SetOnConnectHandler(func(cli mqtt.Client) {
log.Println("mqtt client connection success")
})
clientOpts.SetConnectionLostHandler(func(cli mqtt.Client, err error) {
log.Println("mqtt client connection lost, ", err)
})
dmOpt := dm.NewConfig(productKey, deviceName, deviceSecret)
dmOpt = dmOpt.Valid()
dmClient = aiot.NewWithMQTT(dmOpt, mqtt.NewClient(clientOpts))
dmClient.LogMode(true)
dmClient.UnderlyingClient().Connect().Wait()
if err = dmClient.AlinkConnect(); err != nil {
panic(err)
}
/*
//mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
//opts := mqtt.NewClientOptions().AddBroker("iot-mqtts.cn-north-4.myhuaweicloud.com:1883")
//brokerStr := AliYunMqttClient.ProductKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883"
//opts := mqtt.NewClientOptions().AddBroker(brokerStr)
var raw_broker bytes.Buffer
raw_broker.WriteString("tls://")
raw_broker.WriteString(AliYunMqttClient.ProductKey)
raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")
opts := mqtt.NewClientOptions().AddBroker(raw_broker.String())
auth := Calculate_sign(AliYunMqttClient.ClientId,
AliYunMqttClient.ProductKey,
AliYunMqttClient.DeviceName,
AliYunMqttClient.DeviceSecret,
AliYunMqttClient.TimeStamp)
log.Printf("auth %+v\n",auth)
opts.SetClientID(auth.mqttClientId)
opts.SetUsername(auth.username)
opts.SetPassword(auth.password)
opts.SetKeepAlive(60 * 2 * time.Second)
opts.SetConnectTimeout(30*time.Second)
//opts.SetKeepAlive(120*time.Second)
opts.SetCleanSession(false)
//opts.SetProtocolVersion(3)
opts.SetDefaultPublishHandler(messagePubHandler)
mClient = mqtt.NewClient(opts)
if token := mClient.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
//订阅消息
subTopic := "/" + AliYunMqttClient.ProductKey + "/" + AliYunMqttClient.DeviceName + "/user/get";
token := mClient.Subscribe(subTopic, 0, messageSubHandler)
log.Printf("[Sub] end Subscribe msg to mqtt broker,token : %s \n", token)
//
//c.Disconnect(250)
*/
}
/*
func mqttAppPublish(){
var topic string = "$oc/devices/{" + "5ea67d5d58115909547f50e8_11111112" + "}/sys/properties/report"
var publishContent = PublishContent{}
serviceContent := make([]ServiceContent,0)
properties := Properties{
Value:23,
}
serviceContent = append(serviceContent, ServiceContent{
ServiceID: "Temp",
Properties: properties,
})
publishContent.Service = serviceContent
sJson,err := json.Marshal(publishContent)
if err != nil{
log.Println("publishContent json marshal err")
}
log.Println(string(sJson))
token := mClient.Publish(topic, 0, false, string(sJson))
token.Wait()
}
*/
func MqttAppPublish(){
publishParam := map[string]interface{}{
"Temp": rand.Intn(200),
"Humi": rand.Intn(100),
}
err := dmClient.AlinkReport(dm.MsgTypeEventPropertyPost,
dm.DevNodeLocal,
publishParam)
if err != nil {
log.Printf("error: %#v", err)
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"goAdapter/device"
"os/exec"
"time"
)
@ -46,7 +47,7 @@ var SystemState = SystemStateTemplate{
SoftVer :"V0.0.1",
SystemRTC :"2020-05-26 12:00:00",
RunTime :"0",
DeviceOnline :"100",
DeviceOnline :"0",
DevicePacketLoss : "0",
}
@ -94,6 +95,30 @@ func GetDiskState(){
SystemState.DiskUse = fmt.Sprintf("%3.1f",v.UsedPercent)
}
func GetDeviceOnlineState(){
deviceTotalCnt := 0
deviceOnlineCnt := 0
for _,v := range device.CollectInterfaceMap{
deviceTotalCnt += v.DeviceNodeCnt
deviceOnlineCnt += v.DeviceNodeOnlineCnt
}
SystemState.DeviceOnline = fmt.Sprintf("%2.1f",float32(deviceOnlineCnt*100.0/deviceTotalCnt))
}
func GetDevicePacketLossState(){
deviceCommTotalCnt := 0
deviceCommLossCnt := 0
for _,v := range device.CollectInterfaceMap{
for _,v := range v.DeviceNodeMap{
deviceCommTotalCnt += v.CommTotalCnt
deviceCommLossCnt += v.CommTotalCnt-v.CommSuccessCnt
}
}
SystemState.DevicePacketLoss = fmt.Sprintf("%2.1f",float32(deviceCommLossCnt*100.0/deviceCommTotalCnt))
}
func GetTimeStart(){
timeStart = time.Now()
@ -125,7 +150,7 @@ func NewDataStreamTemplate(legend string) *DataStreamTemplate{
func (d *DataStreamTemplate)AddDataPoint(data DataPointTemplate){
if d.DataPointCnt < 300{
if d.DataPointCnt < 2880{
d.DataPoint = append(d.DataPoint,data)
d.DataPointCnt++
}else{
@ -138,6 +163,8 @@ func CollectSystemParam(){
GetMemState()
GetRunTime()
GetDeviceOnlineState()
GetDevicePacketLossState()
point := DataPointTemplate{}
@ -148,5 +175,13 @@ func CollectSystemParam(){
point.Value = SystemState.DiskUse
point.Time = SystemState.SystemRTC
DiskDataStream.AddDataPoint(point)
point.Value = SystemState.DeviceOnline
point.Time = SystemState.SystemRTC
DeviceOnlineDataStream.AddDataPoint(point)
point.Value = SystemState.DevicePacketLoss
point.Time = SystemState.SystemRTC
DevicePacketLossDataStream.AddDataPoint(point)
}