diff --git a/storage/cmd/root.go b/storage/cmd/root.go index b08b1c6..e011f95 100644 --- a/storage/cmd/root.go +++ b/storage/cmd/root.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" + "gitee.com/timedb/wheatCache/storage/server/single" "log" "net" "os" @@ -11,7 +12,6 @@ import ( _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/proto" - "gitee.com/timedb/wheatCache/storage/server" "github.com/spf13/cobra" "github.com/spf13/viper" "google.golang.org/grpc" @@ -24,7 +24,7 @@ var rootCmd = &cobra.Command{ Short: "storage", Long: `start storage server`, Run: func(cmd *cobra.Command, args []string) { - storageServer := server.NewServer() + storageServer := single.NewServer() // 先写死, 等配置文件 conf := viper.GetStringMap("storage") host := conf["host"].(string) diff --git a/storage/server/single.go b/storage/server/single.go deleted file mode 100644 index dd08c08..0000000 --- a/storage/server/single.go +++ /dev/null @@ -1,27 +0,0 @@ -package server - -import ( - "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/proto" - "gitee.com/timedb/wheatCache/pkg/structure" - - "gitee.com/timedb/wheatCache/pkg/structure/stringx" - - "time" -) - -type serverSingle struct { - middleProduce event.ProduceInterface - lruProduce event.ProduceInterface - ttl time.Duration -} - -func NewServer() proto.CommServerServer { - ser := &serverSingle{} - return ser -} - -// TODO 移除 -func mockLruValue() structure.KeyBaseInterface { - return stringx.NewStringSingle() -} diff --git a/storage/server/single/define.go b/storage/server/single/define.go new file mode 100644 index 0000000..8e650fd --- /dev/null +++ b/storage/server/single/define.go @@ -0,0 +1,12 @@ +package single + +import ( + "sync" +) + +var ( + oneSingleServer sync.Once + sysSingleServer *serverSingle +) + +const timeOutDefault = 2 diff --git a/storage/server/single/single.go b/storage/server/single/single.go new file mode 100644 index 0000000..13a6f18 --- /dev/null +++ b/storage/server/single/single.go @@ -0,0 +1,38 @@ +package single + +import ( + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/lru" + "gitee.com/timedb/wheatCache/storage/dao" + "github.com/spf13/viper" + "time" +) + +type serverSingle struct { + middleProduce event.ProduceInterface + lruProduce event.ProduceInterface + timeOut time.Duration + lruCache *lru.SingleCache + dao *dao.Dao +} + +func NewServer() *serverSingle { + oneSingleServer.Do(func() { + timeOut := viper.GetInt("storage.timeOut") + if timeOut == 0 { + timeOut = timeOutDefault + } + + lruCache := lru.NewLRUCache() + + ser := &serverSingle{ + lruCache: lruCache, + lruProduce: event.NewProduce(lruCache.GetDriver()), + timeOut: time.Duration(timeOut), + dao: dao.NewDao(lruCache), + } + sysSingleServer = ser + + }) + return sysSingleServer +} diff --git a/storage/server/single/stringx.go b/storage/server/single/stringx.go new file mode 100644 index 0000000..66f6e36 --- /dev/null +++ b/storage/server/single/stringx.go @@ -0,0 +1,134 @@ +package single + +import ( + context "context" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/lru" + "gitee.com/timedb/wheatCache/pkg/proto" +) + +func (s *serverSingle) Set( + cxt context.Context, + req *proto.SetRequest, +) (*proto.SetResponse, error) { + + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Set(req.Key.Key, req.Val) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + + return &proto.SetResponse{ + Result: resp.(string), + }, nil +} + +func (s *serverSingle) Get( + cxt context.Context, + req *proto.GetRequest, +) (*proto.GetResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Get(req.Key.Key) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.GetResponse{ + Result: resp.(string), + }, nil +} + +func (s serverSingle) Add( + cxt context.Context, + req *proto.AddRequest, +) (*proto.AddResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Add(req.Key.Key, req.Renewal) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.AddResponse{ + Result: resp.(string), + }, nil +} + +func (s *serverSingle) Reduce( + cxt context.Context, + req *proto.ReduceRequest, +) (*proto.ReduceResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Add(req.Key.Key, req.Renewal) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.ReduceResponse{ + Result: resp.(string), + }, nil +} + +func (s *serverSingle) Setbit( + cxt context.Context, + req *proto.SetbitRequest, +) (*proto.SetbitResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return nil, s.dao.Setbit(req.Key.Key, req.Val, req.Offer) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + _, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.SetbitResponse{}, nil +} + +func (s *serverSingle) Getbit( + cxt context.Context, + req *proto.GetbitRequest, +) (*proto.GetbitResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.GetBit(req.Key.Key, req.Offer) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + flag, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.GetbitResponse{ + Val: flag.(bool), + }, nil +} diff --git a/storage/server/stringx.go b/storage/server/stringx.go deleted file mode 100644 index 793a566..0000000 --- a/storage/server/stringx.go +++ /dev/null @@ -1,237 +0,0 @@ -package server - -import ( - context "context" - "gitee.com/timedb/wheatCache/pkg/errorx" - "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/proto" - "gitee.com/timedb/wheatCache/pkg/structure" -) - -func (s *serverSingle) Set( - cxt context.Context, - req *proto.SetRequest, -) (*proto.SetResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Set(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.SetResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.SetResponse), nil -} - -func (s *serverSingle) Get( - cxt context.Context, - req *proto.GetRequest, -) (*proto.GetResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Get(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.GetResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.GetResponse), nil -} - -func (s *serverSingle) Add( - cxt context.Context, - req *proto.AddRequest, -) (*proto.AddResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Add(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.AddResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.AddResponse), nil -} - -func (s *serverSingle) Reduce( - cxt context.Context, - req *proto.ReduceRequest, -) (*proto.ReduceResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Reduce(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.ReduceResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.ReduceResponse), nil -} - -func (s *serverSingle) Setbit( - cxt context.Context, - req *proto.SetbitRequest, -) (*proto.SetbitResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Setbit(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.SetbitResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.SetbitResponse), nil -} - -func (s *serverSingle) Getbit( - cxt context.Context, - req *proto.GetbitRequest, -) (*proto.GetbitResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Getbit(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.GetbitResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.GetbitResponse), nil -}