commit
96c8a6d53e
|
@ -57,10 +57,6 @@ func (e *Event) GetValue(key string) (interface{}, bool) {
|
||||||
return val, ok
|
return val, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Event) SetErr(err error) {
|
|
||||||
e.err = err
|
|
||||||
}
|
|
||||||
|
|
||||||
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
|
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
|
||||||
func (e *Event) InitWaitEvent() {
|
func (e *Event) InitWaitEvent() {
|
||||||
e.muClose.Lock()
|
e.muClose.Lock()
|
||||||
|
@ -98,14 +94,28 @@ func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
|
||||||
e.eventStatus = workEventState
|
e.eventStatus = workEventState
|
||||||
|
|
||||||
res, err := work()
|
res, err := work()
|
||||||
e.waitResult <- res
|
|
||||||
e.err = err
|
e.err = err
|
||||||
|
e.waitResult <- res
|
||||||
|
|
||||||
close(e.waitResult)
|
close(e.waitResult)
|
||||||
e.eventStatus = closeEventState
|
e.eventStatus = closeEventState
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Event) SetResultErr(err error) {
|
||||||
|
e.muClose.Lock()
|
||||||
|
defer e.muClose.Unlock()
|
||||||
|
if e.eventStatus != waitEventState {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.eventStatus = workEventState
|
||||||
|
e.err = err
|
||||||
|
e.waitResult <- nil
|
||||||
|
close(e.waitResult)
|
||||||
|
e.eventStatus = closeEventState
|
||||||
|
}
|
||||||
|
|
||||||
func NewEvent(eventName string) *Event {
|
func NewEvent(eventName string) *Event {
|
||||||
return &Event{
|
return &Event{
|
||||||
eventName: eventName,
|
eventName: eventName,
|
||||||
|
|
|
@ -3,6 +3,7 @@ package event
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -79,3 +80,20 @@ func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEvent_SetResultErr(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
event := NewEvent("dddd")
|
||||||
|
driver := NewDriver(100)
|
||||||
|
produce := NewProduce(driver)
|
||||||
|
consumer := NewConsumer(driver)
|
||||||
|
go func() {
|
||||||
|
event := consumer.Receive(ctx)
|
||||||
|
event.SetResultErr(errorx.New("err"))
|
||||||
|
}()
|
||||||
|
event.InitWaitEvent()
|
||||||
|
produce.Call(ctx, event)
|
||||||
|
_, err := event.StartWaitEvent(2 * time.Second)
|
||||||
|
fmt.Println(err)
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
|
@ -2,8 +2,8 @@ package lru
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||||||
"gitee.com/timedb/wheatCache/pkg/event"
|
"gitee.com/timedb/wheatCache/pkg/event"
|
||||||
"log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (lru *SingleCache) lruSingleWork() interface{} {
|
func (lru *SingleCache) lruSingleWork() interface{} {
|
||||||
|
@ -11,16 +11,20 @@ func (lru *SingleCache) lruSingleWork() interface{} {
|
||||||
for {
|
for {
|
||||||
workEvent := lru.lruConsumer.Receive(ctx)
|
workEvent := lru.lruConsumer.Receive(ctx)
|
||||||
|
|
||||||
workFunc, ok := workEvent.GetValue(WorkFuncEventKey)
|
switch workEvent.GetEventName() {
|
||||||
if !ok {
|
case OptionEventName:
|
||||||
continue
|
workFunc, ok := workEvent.GetValue(WorkFuncEventKey)
|
||||||
}
|
if !ok {
|
||||||
|
workEvent.ExecWorkAndSendResult(func() (interface{}, error) {
|
||||||
|
return nil, errorx.New("the event haven't work of function")
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
switch workFunc.(type) {
|
if work, ok := workFunc.(event.EventWorkFunc); ok {
|
||||||
case event.EventWorkFunc:
|
workEvent.ExecWorkAndSendResult(work)
|
||||||
workEvent.ExecWorkAndSendResult(workFunc.(event.EventWorkFunc))
|
}
|
||||||
default:
|
case CleanEventName:
|
||||||
log.Print("this is debug ")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue