feay(storage-dao): add setx type dao

This commit is contained in:
bandl 2021-11-16 00:18:12 +08:00
parent d68209c94c
commit af6426f83b
1 changed files with 442 additions and 27 deletions

View File

@ -1,55 +1,470 @@
package dao
import "gitee.com/wheat-os/wheatCache/pkg/proto"
import (
"context"
func (d *Dao) SAdd(_ *proto.BaseKey, _ []string) (*proto.SAddResponse, error) {
panic("not implemented") // TODO: Implement
"gitee.com/wheat-os/wheatCache/pkg/errorx"
"gitee.com/wheat-os/wheatCache/pkg/event2"
"gitee.com/wheat-os/wheatCache/pkg/proto"
"gitee.com/wheat-os/wheatCache/pkg/structure"
"gitee.com/wheat-os/wheatCache/pkg/structure/hashx"
"gitee.com/wheat-os/wheatCache/storage/external"
)
func (d *Dao) SAdd(key *proto.BaseKey, setVal []string) (*proto.SAddResponse, error) {
value, ok := d.lru.Get(key)
if !ok {
hashVal := hashx.NewHashXSingle()
for _, sv := range setVal {
hashVal.SetX(sv, sv)
}
d.lru.Add(key, hashVal)
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
for _, sv := range setVal {
b, up := hashVal.SetX(sv, sv)
if b {
d.lru.UpdateLruSize(up)
}
}
return &proto.SAddResponse{}, nil
}
func (d *Dao) SCard(_ *proto.BaseKey) (*proto.SCardResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SCard(key *proto.BaseKey) (*proto.SCardResponse, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
return &proto.SCardResponse{Length: int32(hashVal.Length())}, nil
}
func (d *Dao) SDiff(_ *proto.BaseKey, _ []string) (*proto.SDiffResponse, error) {
panic("not implemented") // TODO: Implement
func mathSDiff(masterItem []string, extKey []string) ([]string, error) {
cli, err := external.NewGatewayClient()
if err != nil {
return nil, err
}
m := make(map[string]struct{})
for _, bVal := range masterItem {
m[bVal] = struct{}{}
}
setItem := make([]string, 0, len(extKey))
ctx := context.Background()
for _, sKey := range extKey {
baseKey := proto.NewBaseKey(sKey)
resp, err := cli.SScan(ctx, &proto.SScanRequest{
Key: baseKey,
Count: -1,
})
if err != nil {
continue
}
for _, item := range resp.Results {
if _, ok := m[item]; !ok {
setItem = append(setItem, item)
m[item] = struct{}{}
}
}
}
return setItem, nil
}
func (d *Dao) SDiffStore(_ *proto.BaseKey, _ []string, _ string) (*proto.SDiffStoreResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SDiff(key *proto.BaseKey, setKey []string) (interface{}, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
baseItem := hashVal.Key()
// await 挂起
return event2.EventAwaitFunc(func() (interface{}, error) {
setItem, err := mathSDiff(baseItem, setKey)
if err != nil {
return nil, err
}
return &proto.SDiffResponse{Result: setItem}, nil
}), nil
}
func (d *Dao) SInter(_ *proto.BaseKey, _ []string) (*proto.SInterResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SDiffStore(key *proto.BaseKey, setKey []string, saveKey string) (interface{}, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
baseItem := hashVal.Key()
// await 挂起
return event2.EventAwaitFunc(func() (interface{}, error) {
setItem, err := mathSDiff(baseItem, setKey)
if err != nil {
return nil, err
}
cli, err := external.NewGatewayClient()
if err != nil {
return nil, err
}
ctx := context.Background()
_, err = cli.SAdd(ctx, &proto.SAddRequest{
Key: proto.NewBaseKey(saveKey),
Member: setItem,
})
if err != nil {
return nil, err
}
return &proto.SDiffStoreResponse{}, nil
}), nil
}
func (d *Dao) SInterStore(_ *proto.BaseKey, _ []string, _ string) (*proto.SInterStoreResponse, error) {
panic("not implemented") // TODO: Implement
func mathSInter(masterItem []string, extKey []string) ([]string, error) {
cli, err := external.NewGatewayClient()
if err != nil {
return nil, err
}
m := make(map[string]struct{})
for _, bVal := range masterItem {
m[bVal] = struct{}{}
}
setItem := make([]string, 0, len(extKey))
ctx := context.Background()
for _, sKey := range extKey {
resp, err := cli.SScan(ctx, &proto.SScanRequest{
Key: proto.NewBaseKey(sKey),
Count: -1,
})
if err != nil {
continue
}
for _, item := range resp.Results {
if _, ok := m[item]; ok {
setItem = append(setItem, item)
delete(m, item)
}
}
}
return setItem, nil
}
func (d *Dao) SIsMember(_ *proto.BaseKey, _ string) (*proto.SIsMemberResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SInter(key *proto.BaseKey, setKey []string) (interface{}, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
baseItem := hashVal.Key()
// await 挂起
return event2.EventAwaitFunc(func() (interface{}, error) {
setItem, err := mathSInter(baseItem, setKey)
if err != nil {
return nil, err
}
return &proto.SInterResponse{Result: setItem}, nil
}), nil
}
func (d *Dao) SMove(_ *proto.BaseKey, _ string, _ []string) (*proto.SMoveResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SInterStore(key *proto.BaseKey, setKey []string, saveKey string) (interface{}, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
baseItem := hashVal.Key()
// await 挂起
return event2.EventAwaitFunc(func() (interface{}, error) {
setItem, err := mathSInter(baseItem, setKey)
if err != nil {
return nil, err
}
cli, err := external.NewGatewayClient()
if err != nil {
return nil, err
}
ctx := context.Background()
_, err = cli.SAdd(ctx, &proto.SAddRequest{
Key: proto.NewBaseKey(saveKey),
Member: setItem,
})
if err != nil {
return nil, err
}
return &proto.SInterStoreResponse{}, nil
}), nil
}
func (d *Dao) SPop(_ *proto.BaseKey, _ int32) (*proto.SPopResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SIsMember(key *proto.BaseKey, member string) (*proto.SIsMemberResponse, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
_, err := hashVal.Get(member)
if err != nil {
return &proto.SIsMemberResponse{Exist: false}, nil
}
return &proto.SIsMemberResponse{Exist: true}, nil
}
func (d *Dao) SRem(_ *proto.BaseKey, _ int32) (*proto.SRemResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SMove(key *proto.BaseKey, moveKey string, members []string) (interface{}, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
moveMembers := make([]string, 0, len(members))
for _, member := range members {
up, err := hashVal.Del(member)
if err == nil {
d.lru.UpdateLruSize(up)
moveMembers = append(moveMembers, member)
}
}
return event2.EventAwaitFunc(func() (interface{}, error) {
cli, err := external.NewGatewayClient()
if err != nil {
return nil, err
}
ctx := context.Background()
_, err = cli.SAdd(ctx, &proto.SAddRequest{
Key: proto.NewBaseKey(moveKey),
Member: moveMembers,
})
if err != nil {
return nil, err
}
return &proto.SMoveResponse{}, nil
}), nil
}
func (d *Dao) SUnion(_ *proto.BaseKey, _ []string) (*proto.SUnionResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SPop(key *proto.BaseKey, count int32) (*proto.SPopResponse, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
members := make([]string, 0, count)
result := hashVal.Range(0, int(count), "")
for _, res := range result {
up, err := hashVal.Del(res)
if err != nil {
return nil, err
}
d.lru.UpdateLruSize(up)
members = append(members, res)
}
return &proto.SPopResponse{Members: members}, nil
}
func (d *Dao) SUnionStore(_ *proto.BaseKey, _ []string, _ string) (*proto.SUnionStoreResponse, error) {
panic("not implemented") // TODO: Implement
func (d *Dao) SRem(key *proto.BaseKey, count int32) (*proto.SRemResponse, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
result := hashVal.Range(0, int(count), "")
for _, res := range result {
up, err := hashVal.Del(res)
if err != nil {
return nil, err
}
d.lru.UpdateLruSize(up)
}
return &proto.SRemResponse{}, nil
}
func (d *Dao) SScan(_ *proto.BaseKey, _ int32, _ string, _ int32) (*proto.SScanResponse, error) {
panic("not implemented") // TODO: Implement
func mathSUnion(masterItem []string, extKey []string) ([]string, error) {
cli, err := external.NewGatewayClient()
if err != nil {
return nil, err
}
m := make(map[string]struct{})
for _, bVal := range masterItem {
m[bVal] = struct{}{}
}
ctx := context.Background()
for _, sKey := range extKey {
resp, err := cli.SScan(ctx, &proto.SScanRequest{
Key: proto.NewBaseKey(sKey),
Count: -1,
})
if err != nil {
continue
}
for _, item := range resp.Results {
if _, ok := m[item]; !ok {
masterItem = append(masterItem, item)
m[item] = struct{}{}
}
}
}
return masterItem, nil
}
func (d *Dao) SUnion(key *proto.BaseKey, setKey []string) (interface{}, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
baseItem := hashVal.Key()
// await 挂起
return event2.EventAwaitFunc(func() (interface{}, error) {
setItem, err := mathSUnion(baseItem, setKey)
if err != nil {
return nil, err
}
return &proto.SUnionResponse{Result: setItem}, nil
}), nil
}
func (d *Dao) SUnionStore(key *proto.BaseKey, setKey []string, saveKey string) (interface{}, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
baseItem := hashVal.Key()
// await 挂起
return event2.EventAwaitFunc(func() (interface{}, error) {
setItem, err := mathSUnion(baseItem, setKey)
if err != nil {
return nil, err
}
cli, err := external.NewGatewayClient()
if err != nil {
return nil, err
}
ctx := context.Background()
_, err = cli.SAdd(ctx, &proto.SAddRequest{
Key: proto.NewBaseKey(saveKey),
Member: setItem,
})
if err != nil {
return nil, err
}
return &proto.SUnionStoreResponse{}, nil
}), nil
}
func (d *Dao) SScan(key *proto.BaseKey, cursor int32, regex string, count int32) (*proto.SScanResponse, error) {
value, ok := d.lru.Get(key)
if !ok {
return nil, errorx.KeyBaseIsNilErr()
}
hashVal, ok := value.(structure.HashXInterface)
if !ok {
return nil, errorx.DaoTypeErr("setx")
}
result := hashVal.Range(int(cursor), int(count), regex)
return &proto.SScanResponse{Results: result}, nil
}