feat(storage): update single storage

This commit is contained in:
bandl 2021-10-05 16:41:17 +08:00
parent 125b137ef6
commit 212e025b23
6 changed files with 186 additions and 266 deletions

View File

@ -3,6 +3,7 @@ package cmd
import (
"fmt"
"gitee.com/timedb/wheatCache/storage/server/single"
"log"
"net"
"os"
@ -11,7 +12,6 @@ import (
_ "gitee.com/timedb/wheatCache/conf"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/storage/server"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
@ -24,7 +24,7 @@ var rootCmd = &cobra.Command{
Short: "storage",
Long: `start storage server`,
Run: func(cmd *cobra.Command, args []string) {
storageServer := server.NewServer()
storageServer := single.NewServer()
// 先写死, 等配置文件
conf := viper.GetStringMap("storage")
host := conf["host"].(string)

View File

@ -1,27 +0,0 @@
package server
import (
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
"gitee.com/timedb/wheatCache/pkg/structure/stringx"
"time"
)
type serverSingle struct {
middleProduce event.ProduceInterface
lruProduce event.ProduceInterface
ttl time.Duration
}
func NewServer() proto.CommServerServer {
ser := &serverSingle{}
return ser
}
// TODO 移除
func mockLruValue() structure.KeyBaseInterface {
return stringx.NewStringSingle()
}

View File

@ -0,0 +1,12 @@
package single
import (
"sync"
)
var (
oneSingleServer sync.Once
sysSingleServer *serverSingle
)
const timeOutDefault = 2

View File

@ -0,0 +1,38 @@
package single
import (
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/lru"
"gitee.com/timedb/wheatCache/storage/dao"
"github.com/spf13/viper"
"time"
)
type serverSingle struct {
middleProduce event.ProduceInterface
lruProduce event.ProduceInterface
timeOut time.Duration
lruCache *lru.SingleCache
dao *dao.Dao
}
func NewServer() *serverSingle {
oneSingleServer.Do(func() {
timeOut := viper.GetInt("storage.timeOut")
if timeOut == 0 {
timeOut = timeOutDefault
}
lruCache := lru.NewLRUCache()
ser := &serverSingle{
lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut),
dao: dao.NewDao(lruCache),
}
sysSingleServer = ser
})
return sysSingleServer
}

View File

@ -0,0 +1,134 @@
package single
import (
context "context"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/lru"
"gitee.com/timedb/wheatCache/pkg/proto"
)
func (s *serverSingle) Set(
cxt context.Context,
req *proto.SetRequest,
) (*proto.SetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Set(req.Key.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.SetResponse{
Result: resp.(string),
}, nil
}
func (s *serverSingle) Get(
cxt context.Context,
req *proto.GetRequest,
) (*proto.GetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Get(req.Key.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.GetResponse{
Result: resp.(string),
}, nil
}
func (s serverSingle) Add(
cxt context.Context,
req *proto.AddRequest,
) (*proto.AddResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Add(req.Key.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.AddResponse{
Result: resp.(string),
}, nil
}
func (s *serverSingle) Reduce(
cxt context.Context,
req *proto.ReduceRequest,
) (*proto.ReduceResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Add(req.Key.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.ReduceResponse{
Result: resp.(string),
}, nil
}
func (s *serverSingle) Setbit(
cxt context.Context,
req *proto.SetbitRequest,
) (*proto.SetbitResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return nil, s.dao.Setbit(req.Key.Key, req.Val, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.SetbitResponse{}, nil
}
func (s *serverSingle) Getbit(
cxt context.Context,
req *proto.GetbitRequest,
) (*proto.GetbitResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.GetBit(req.Key.Key, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.GetbitResponse{
Val: flag.(bool),
}, nil
}

View File

@ -1,237 +0,0 @@
package server
import (
context "context"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
)
func (s *serverSingle) Set(
cxt context.Context,
req *proto.SetRequest,
) (*proto.SetResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Set(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.SetResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.SetResponse), nil
}
func (s *serverSingle) Get(
cxt context.Context,
req *proto.GetRequest,
) (*proto.GetResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Get(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.GetResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.GetResponse), nil
}
func (s *serverSingle) Add(
cxt context.Context,
req *proto.AddRequest,
) (*proto.AddResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Add(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.AddResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.AddResponse), nil
}
func (s *serverSingle) Reduce(
cxt context.Context,
req *proto.ReduceRequest,
) (*proto.ReduceResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Reduce(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.ReduceResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.ReduceResponse), nil
}
func (s *serverSingle) Setbit(
cxt context.Context,
req *proto.SetbitRequest,
) (*proto.SetbitResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Setbit(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.SetbitResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.SetbitResponse), nil
}
func (s *serverSingle) Getbit(
cxt context.Context,
req *proto.GetbitRequest,
) (*proto.GetbitResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Getbit(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.GetbitResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.GetbitResponse), nil
}