refactor(activity): make activity queue as a service

This commit is contained in:
LinkinStars 2023-06-05 14:10:04 +08:00
parent 773d2142db
commit c6ea2fc5da
13 changed files with 176 additions and 122 deletions

View File

@ -46,6 +46,7 @@ import (
"github.com/answerdev/answer/internal/service/action"
activity2 "github.com/answerdev/answer/internal/service/activity"
activity_common2 "github.com/answerdev/answer/internal/service/activity_common"
"github.com/answerdev/answer/internal/service/activity_queue"
"github.com/answerdev/answer/internal/service/answer_common"
auth2 "github.com/answerdev/answer/internal/service/auth"
"github.com/answerdev/answer/internal/service/collection_common"
@ -140,11 +141,12 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
tagRepo := tag.NewTagRepo(dataData, uniqueIDRepo)
revisionRepo := revision.NewRevisionRepo(dataData, uniqueIDRepo)
revisionService := revision_common.NewRevisionService(revisionRepo, userRepo)
tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, tagRelRepo, tagRepo, revisionService, siteInfoCommonService)
activityQueueService := activity_queue.NewActivityQueueService()
tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, tagRelRepo, tagRepo, revisionService, siteInfoCommonService, activityQueueService)
objService := object_info.NewObjService(answerRepo, questionRepo, commentCommonRepo, tagCommonRepo, tagCommonService)
voteRepo := activity_common.NewVoteRepo(dataData, activityRepo)
notificationQueueService := notice_queue.NewNotificationQueueService()
commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, notificationQueueService)
commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, notificationQueueService, activityQueueService)
rolePowerRelRepo := role.NewRolePowerRelRepo(dataData)
rolePowerRelService := role2.NewRolePowerRelService(rolePowerRelRepo, userRoleRelService)
rankService := rank2.NewRankService(userCommon, userRankRepo, objService, userRoleRelService, rolePowerRelService, configService)
@ -156,7 +158,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
voteService := service.NewVoteService(serviceVoteRepo, uniqueIDRepo, configService, questionRepo, answerRepo, commentCommonRepo, objService)
voteController := controller.NewVoteController(voteService, rankService)
followRepo := activity_common.NewFollowRepo(dataData, uniqueIDRepo, activityRepo)
tagService := tag2.NewTagService(tagRepo, tagCommonService, revisionService, followRepo, siteInfoCommonService)
tagService := tag2.NewTagService(tagRepo, tagCommonService, revisionService, followRepo, siteInfoCommonService, activityQueueService)
tagController := controller.NewTagController(tagService, tagCommonService, rankService)
followFollowRepo := activity.NewFollowRepo(dataData, uniqueIDRepo, activityRepo)
followService := follow.NewFollowService(followFollowRepo, followRepo, tagCommonRepo)
@ -167,14 +169,14 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
answerCommon := answercommon.NewAnswerCommon(answerRepo)
metaRepo := meta.NewMetaRepo(dataData)
metaService := meta2.NewMetaService(metaRepo)
questionCommon := questioncommon.NewQuestionCommon(questionRepo, answerRepo, voteRepo, followRepo, tagCommonService, userCommon, collectionCommon, answerCommon, metaService, configService)
questionCommon := questioncommon.NewQuestionCommon(questionRepo, answerRepo, voteRepo, followRepo, tagCommonService, userCommon, collectionCommon, answerCommon, metaService, configService, activityQueueService)
collectionService := service.NewCollectionService(collectionRepo, collectionGroupRepo, questionCommon)
collectionController := controller.NewCollectionController(collectionService)
answerActivityRepo := activity.NewAnswerActivityRepo(dataData, activityRepo, userRankRepo, notificationQueueService)
questionActivityRepo := activity.NewQuestionActivityRepo(dataData, activityRepo, userRankRepo)
answerActivityService := activity2.NewAnswerActivityService(answerActivityRepo, questionActivityRepo)
questionService := service.NewQuestionService(questionRepo, tagCommonService, questionCommon, userCommon, userRepo, revisionService, metaService, collectionCommon, answerActivityService, dataData, emailService, notificationQueueService)
answerService := service.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService, notificationQueueService)
questionService := service.NewQuestionService(questionRepo, tagCommonService, questionCommon, userCommon, userRepo, revisionService, metaService, collectionCommon, answerActivityService, dataData, emailService, notificationQueueService, activityQueueService)
answerService := service.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService, notificationQueueService, activityQueueService)
questionController := controller.NewQuestionController(questionService, answerService, rankService, siteInfoCommonService)
dashboardService := dashboard.NewDashboardService(questionRepo, answerRepo, commentCommonRepo, voteRepo, userRepo, reportRepo, configService, siteInfoCommonService, serviceConf, dataData)
answerController := controller.NewAnswerController(answerService, rankService, dashboardService)
@ -182,7 +184,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
searchRepo := search_common.NewSearchRepo(dataData, uniqueIDRepo, userCommon)
searchService := service.NewSearchService(searchParser, searchRepo)
searchController := controller.NewSearchController(searchService)
serviceRevisionService := service.NewRevisionService(revisionRepo, userCommon, questionCommon, answerService, objService, questionRepo, answerRepo, tagRepo, tagCommonService, notificationQueueService)
serviceRevisionService := service.NewRevisionService(revisionRepo, userCommon, questionCommon, answerService, objService, questionRepo, answerRepo, tagRepo, tagCommonService, notificationQueueService, activityQueueService)
revisionController := controller.NewRevisionController(serviceRevisionService, rankService)
rankController := controller.NewRankController(rankService)
reportHandle := report_handle_admin.NewReportHandle(questionCommon, commentRepo, configService, notificationQueueService)
@ -204,7 +206,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
notificationController := controller.NewNotificationController(notificationService, rankService)
dashboardController := controller.NewDashboardController(dashboardService)
uploadController := controller.NewUploadController(uploaderService)
activityCommon := activity_common2.NewActivityCommon(activityRepo)
activityCommon := activity_common2.NewActivityCommon(activityRepo, activityQueueService)
activityActivityRepo := activity.NewActivityRepo(dataData, configService)
commentCommonService := comment_common.NewCommentCommonService(commentCommonRepo)
activityService := activity2.NewActivityService(activityActivityRepo, userCommon, activityCommon, tagCommonService, objService, commentCommonService, revisionService, metaService, configService)

View File

@ -4,12 +4,13 @@ import "github.com/answerdev/answer/internal/base/constant"
// ActivityMsg activity message
type ActivityMsg struct {
UserID string `json:"user_id"`
TriggerUserID int64 `json:"trigger_user_id"`
ObjectID string `json:"object_id"`
OriginalObjectID string `json:"original_object_id"`
ActivityTypeKey constant.ActivityTypeKey `json:"activity_type_key"`
RevisionID string `json:"revision_id"`
UserID string
TriggerUserID int64
ObjectID string
OriginalObjectID string
ActivityTypeKey constant.ActivityTypeKey
RevisionID string
ExtraInfo map[string]string
}
// GetObjectTimelineReq get object timeline request

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/answerdev/answer/internal/entity"
"github.com/answerdev/answer/internal/schema"
"github.com/answerdev/answer/internal/service/activity_queue"
"github.com/answerdev/answer/pkg/converter"
"github.com/answerdev/answer/pkg/uid"
@ -27,51 +28,44 @@ type ActivityRepo interface {
}
type ActivityCommon struct {
activityRepo ActivityRepo
activityRepo ActivityRepo
activityQueueService activity_queue.ActivityQueueService
}
// NewActivityCommon new activity common
func NewActivityCommon(
activityRepo ActivityRepo,
activityQueueService activity_queue.ActivityQueueService,
) *ActivityCommon {
activity := &ActivityCommon{
activityRepo: activityRepo,
activityRepo: activityRepo,
activityQueueService: activityQueueService,
}
activity.HandleActivity()
activity.activityQueueService.RegisterHandler(activity.HandleActivity)
return activity
}
// HandleActivity handle activity message
func (ac *ActivityCommon) HandleActivity() {
go func() {
defer func() {
if err := recover(); err != nil {
log.Error(err)
}
}()
func (ac *ActivityCommon) HandleActivity(ctx context.Context, msg *schema.ActivityMsg) error {
activityType, err := ac.activityRepo.GetActivityTypeByConfigKey(context.Background(), string(msg.ActivityTypeKey))
if err != nil {
log.Errorf("error getting activity type %s, activity type is %d", err, activityType)
return err
}
for msg := range activity_queue.ActivityQueue {
log.Debugf("received activity %+v", msg)
activityType, err := ac.activityRepo.GetActivityTypeByConfigKey(context.Background(), string(msg.ActivityTypeKey))
if err != nil {
log.Errorf("error getting activity type %s, activity type is %d", err, activityType)
}
act := &entity.Activity{
UserID: msg.UserID,
TriggerUserID: msg.TriggerUserID,
ObjectID: uid.DeShortID(msg.ObjectID),
OriginalObjectID: uid.DeShortID(msg.OriginalObjectID),
ActivityType: activityType,
Cancelled: entity.ActivityAvailable,
}
if len(msg.RevisionID) > 0 {
act.RevisionID = converter.StringToInt64(msg.RevisionID)
}
if err := ac.activityRepo.AddActivity(context.TODO(), act); err != nil {
log.Error(err)
}
}
}()
act := &entity.Activity{
UserID: msg.UserID,
TriggerUserID: msg.TriggerUserID,
ObjectID: uid.DeShortID(msg.ObjectID),
OriginalObjectID: uid.DeShortID(msg.OriginalObjectID),
ActivityType: activityType,
Cancelled: entity.ActivityAvailable,
}
if len(msg.RevisionID) > 0 {
act.RevisionID = converter.StringToInt64(msg.RevisionID)
}
if err := ac.activityRepo.AddActivity(ctx, act); err != nil {
return err
}
return nil
}

View File

@ -1,14 +1,50 @@
package activity_queue
import (
"context"
"github.com/answerdev/answer/internal/schema"
"github.com/segmentfault/pacman/log"
)
var (
ActivityQueue = make(chan *schema.ActivityMsg, 128)
)
// AddActivity add new activity
func AddActivity(msg *schema.ActivityMsg) {
ActivityQueue <- msg
type ActivityQueueService interface {
Send(ctx context.Context, msg *schema.ActivityMsg)
RegisterHandler(handler func(ctx context.Context, msg *schema.ActivityMsg) error)
}
type activityQueueService struct {
Queue chan *schema.ActivityMsg
Handler func(ctx context.Context, msg *schema.ActivityMsg) error
}
func (ns *activityQueueService) Send(ctx context.Context, msg *schema.ActivityMsg) {
ns.Queue <- msg
}
func (ns *activityQueueService) RegisterHandler(
handler func(ctx context.Context, msg *schema.ActivityMsg) error) {
ns.Handler = handler
}
func (ns *activityQueueService) working() {
go func() {
for msg := range ns.Queue {
log.Debugf("received activity %+v", msg)
if ns.Handler == nil {
log.Warnf("no handler for activity")
continue
}
if err := ns.Handler(context.Background(), msg); err != nil {
log.Error(err)
}
}
}()
}
// NewActivityQueueService create a new activity queue service
func NewActivityQueueService() ActivityQueueService {
ns := &activityQueueService{}
ns.Queue = make(chan *schema.ActivityMsg, 128)
ns.working()
return ns
}

View File

@ -44,6 +44,7 @@ type AnswerService struct {
emailService *export.EmailService
roleService *role.UserRoleRelService
notificationQueueService notice_queue.NotificationQueueService
activityQueueService activity_queue.ActivityQueueService
}
func NewAnswerService(
@ -60,6 +61,7 @@ func NewAnswerService(
emailService *export.EmailService,
roleService *role.UserRoleRelService,
notificationQueueService notice_queue.NotificationQueueService,
activityQueueService activity_queue.ActivityQueueService,
) *AnswerService {
return &AnswerService{
answerRepo: answerRepo,
@ -75,6 +77,7 @@ func NewAnswerService(
emailService: emailService,
roleService: roleService,
notificationQueueService: notificationQueueService,
activityQueueService: activityQueueService,
}
}
@ -136,7 +139,7 @@ func (as *AnswerService) RemoveAnswer(ctx context.Context, req *schema.RemoveAns
//if err != nil {
// log.Errorf("delete answer activity change failed: %s", err.Error())
//}
activity_queue.AddActivity(&schema.ActivityMsg{
as.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: answerInfo.ID,
OriginalObjectID: answerInfo.ID,
@ -202,14 +205,14 @@ func (as *AnswerService) Insert(ctx context.Context, req *schema.AnswerAddReq) (
as.notificationAnswerTheQuestion(ctx, questionInfo.UserID, questionInfo.ID, insertData.ID, req.UserID, questionInfo.Title,
insertData.OriginalText)
activity_queue.AddActivity(&schema.ActivityMsg{
as.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: insertData.UserID,
ObjectID: insertData.ID,
OriginalObjectID: insertData.ID,
ActivityTypeKey: constant.ActAnswerAnswered,
RevisionID: revisionID,
})
activity_queue.AddActivity(&schema.ActivityMsg{
as.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: insertData.UserID,
ObjectID: insertData.ID,
OriginalObjectID: questionInfo.ID,
@ -302,7 +305,7 @@ func (as *AnswerService) Update(ctx context.Context, req *schema.AnswerUpdateReq
return insertData.ID, err
}
if canUpdate {
activity_queue.AddActivity(&schema.ActivityMsg{
as.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: insertData.UserID,
ObjectID: insertData.ID,
OriginalObjectID: insertData.ID,
@ -469,7 +472,7 @@ func (as *AnswerService) AdminSetAnswerStatus(ctx context.Context, req *schema.A
//if err != nil {
// log.Errorf("admin delete question then rank rollback error %s", err.Error())
//}
activity_queue.AddActivity(&schema.ActivityMsg{
as.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: answerInfo.ID,
OriginalObjectID: answerInfo.ID,

View File

@ -66,6 +66,7 @@ type CommentService struct {
emailService *export.EmailService
userRepo usercommon.UserRepo
notificationQueueService notice_queue.NotificationQueueService
activityQueueService activity_queue.ActivityQueueService
}
// NewCommentService new comment service
@ -78,6 +79,7 @@ func NewCommentService(
emailService *export.EmailService,
userRepo usercommon.UserRepo,
notificationQueueService notice_queue.NotificationQueueService,
activityQueueService activity_queue.ActivityQueueService,
) *CommentService {
return &CommentService{
commentRepo: commentRepo,
@ -88,6 +90,7 @@ func NewCommentService(
emailService: emailService,
userRepo: userRepo,
notificationQueueService: notificationQueueService,
activityQueueService: activityQueueService,
}
}
@ -164,7 +167,7 @@ func (cs *CommentService) AddComment(ctx context.Context, req *schema.AddComment
case constant.AnswerObjectType:
activityMsg.ActivityTypeKey = constant.ActAnswerCommented
}
activity_queue.AddActivity(activityMsg)
cs.activityQueueService.Send(ctx, activityMsg)
return resp, nil
}

View File

@ -48,6 +48,3 @@ func NewNotificationQueueService() NotificationQueueService {
ns.working()
return ns
}
func AddNotification2(msg *schema.NotificationMsg) {
}

View File

@ -4,6 +4,7 @@ import (
"github.com/answerdev/answer/internal/service/action"
"github.com/answerdev/answer/internal/service/activity"
"github.com/answerdev/answer/internal/service/activity_common"
"github.com/answerdev/answer/internal/service/activity_queue"
answercommon "github.com/answerdev/answer/internal/service/answer_common"
"github.com/answerdev/answer/internal/service/auth"
collectioncommon "github.com/answerdev/answer/internal/service/collection_common"
@ -88,4 +89,5 @@ var ProviderSetService = wire.NewSet(
plugin_common.NewPluginCommonService,
config.NewConfigService,
notice_queue.NewNotificationQueueService,
activity_queue.NewActivityQueueService,
)

View File

@ -54,16 +54,17 @@ type QuestionRepo interface {
// QuestionCommon user service
type QuestionCommon struct {
questionRepo QuestionRepo
answerRepo answercommon.AnswerRepo
voteRepo activity_common.VoteRepo
followCommon activity_common.FollowRepo
tagCommon *tagcommon.TagCommonService
userCommon *usercommon.UserCommon
collectionCommon *collectioncommon.CollectionCommon
AnswerCommon *answercommon.AnswerCommon
metaService *meta.MetaService
configService *config.ConfigService
questionRepo QuestionRepo
answerRepo answercommon.AnswerRepo
voteRepo activity_common.VoteRepo
followCommon activity_common.FollowRepo
tagCommon *tagcommon.TagCommonService
userCommon *usercommon.UserCommon
collectionCommon *collectioncommon.CollectionCommon
AnswerCommon *answercommon.AnswerCommon
metaService *meta.MetaService
configService *config.ConfigService
activityQueueService activity_queue.ActivityQueueService
}
func NewQuestionCommon(questionRepo QuestionRepo,
@ -76,18 +77,20 @@ func NewQuestionCommon(questionRepo QuestionRepo,
answerCommon *answercommon.AnswerCommon,
metaService *meta.MetaService,
configService *config.ConfigService,
activityQueueService activity_queue.ActivityQueueService,
) *QuestionCommon {
return &QuestionCommon{
questionRepo: questionRepo,
answerRepo: answerRepo,
voteRepo: voteRepo,
followCommon: followCommon,
tagCommon: tagCommon,
userCommon: userCommon,
collectionCommon: collectionCommon,
AnswerCommon: answerCommon,
metaService: metaService,
configService: configService,
questionRepo: questionRepo,
answerRepo: answerRepo,
voteRepo: voteRepo,
followCommon: followCommon,
tagCommon: tagCommon,
userCommon: userCommon,
collectionCommon: collectionCommon,
AnswerCommon: answerCommon,
metaService: metaService,
configService: configService,
activityQueueService: activityQueueService,
}
}
@ -502,7 +505,7 @@ func (qs *QuestionCommon) CloseQuestion(ctx context.Context, req *schema.CloseQu
return err
}
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: questionInfo.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,

View File

@ -54,6 +54,7 @@ type QuestionService struct {
data *data.Data
emailService *export.EmailService
notificationQueueService notice_queue.NotificationQueueService
activityQueueService activity_queue.ActivityQueueService
}
func NewQuestionService(
@ -69,6 +70,7 @@ func NewQuestionService(
data *data.Data,
emailService *export.EmailService,
notificationQueueService notice_queue.NotificationQueueService,
activityQueueService activity_queue.ActivityQueueService,
) *QuestionService {
return &QuestionService{
questionRepo: questionRepo,
@ -83,6 +85,7 @@ func NewQuestionService(
data: data,
emailService: emailService,
notificationQueueService: notificationQueueService,
activityQueueService: activityQueueService,
}
}
@ -110,7 +113,7 @@ func (qs *QuestionService) CloseQuestion(ctx context.Context, req *schema.CloseQ
return err
}
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,
@ -134,7 +137,7 @@ func (qs *QuestionService) ReopenQuestion(ctx context.Context, req *schema.Reope
if err != nil {
return err
}
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,
@ -316,7 +319,7 @@ func (qs *QuestionService) AddQuestion(ctx context.Context, req *schema.Question
}
}
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: question.UserID,
ObjectID: question.ID,
OriginalObjectID: question.ID,
@ -385,7 +388,7 @@ func (qs *QuestionService) OperationQuestion(ctx context.Context, req *schema.Op
actMap[schema.QuestionOperationShow] = constant.ActQuestionShow
_, ok := actMap[req.Operation]
if ok {
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,
@ -477,7 +480,7 @@ func (qs *QuestionService) RemoveQuestion(ctx context.Context, req *schema.Remov
// if err != nil {
// log.Errorf("user DeleteQuestion rank rollback error %s", err.Error())
// }
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,
@ -826,7 +829,7 @@ func (qs *QuestionService) UpdateQuestion(ctx context.Context, req *schema.Quest
return
}
if canUpdate {
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: question.ID,
ActivityTypeKey: constant.ActQuestionEdited,
@ -1220,7 +1223,7 @@ func (qs *QuestionService) AdminSetQuestionStatus(ctx context.Context, questionI
//if err != nil {
// log.Errorf("admin delete question then rank rollback error %s", err.Error())
//}
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: questionInfo.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,
@ -1228,7 +1231,7 @@ func (qs *QuestionService) AdminSetQuestionStatus(ctx context.Context, questionI
})
}
if setStatus == entity.QuestionStatusAvailable && questionInfo.Status == entity.QuestionStatusClosed {
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: questionInfo.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,
@ -1236,7 +1239,7 @@ func (qs *QuestionService) AdminSetQuestionStatus(ctx context.Context, questionI
})
}
if setStatus == entity.QuestionStatusClosed && questionInfo.Status != entity.QuestionStatusClosed {
activity_queue.AddActivity(&schema.ActivityMsg{
qs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: questionInfo.UserID,
ObjectID: questionInfo.ID,
OriginalObjectID: questionInfo.ID,

View File

@ -38,6 +38,7 @@ type RevisionService struct {
tagRepo tag_common.TagRepo
tagCommon *tagcommon.TagCommonService
notificationQueueService notice_queue.NotificationQueueService
activityQueueService activity_queue.ActivityQueueService
}
func NewRevisionService(
@ -51,6 +52,7 @@ func NewRevisionService(
tagRepo tag_common.TagRepo,
tagCommon *tagcommon.TagCommonService,
notificationQueueService notice_queue.NotificationQueueService,
activityQueueService activity_queue.ActivityQueueService,
) *RevisionService {
return &RevisionService{
revisionRepo: revisionRepo,
@ -63,6 +65,7 @@ func NewRevisionService(
tagRepo: tagRepo,
tagCommon: tagCommon,
notificationQueueService: notificationQueueService,
activityQueueService: activityQueueService,
}
}
@ -158,7 +161,7 @@ func (rs *RevisionService) revisionAuditQuestion(ctx context.Context, revisionit
if saveerr != nil {
return saveerr
}
activity_queue.AddActivity(&schema.ActivityMsg{
rs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: revisionitem.UserID,
ObjectID: revisionitem.ObjectID,
ActivityTypeKey: constant.ActQuestionEdited,
@ -215,7 +218,7 @@ func (rs *RevisionService) revisionAuditAnswer(ctx context.Context, revisionitem
msg.NotificationAction = constant.NotificationUpdateAnswer
rs.notificationQueueService.Send(ctx, msg)
activity_queue.AddActivity(&schema.ActivityMsg{
rs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: revisionitem.UserID,
ObjectID: insertData.ID,
OriginalObjectID: insertData.ID,
@ -261,7 +264,7 @@ func (rs *RevisionService) revisionAuditTag(ctx context.Context, revisionitem *s
}
}
activity_queue.AddActivity(&schema.ActivityMsg{
rs.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: revisionitem.UserID,
ObjectID: taginfo.TagID,
OriginalObjectID: taginfo.TagID,

View File

@ -25,11 +25,12 @@ import (
// TagService user service
type TagService struct {
tagRepo tagcommonser.TagRepo
tagCommonService *tagcommonser.TagCommonService
revisionService *revision_common.RevisionService
followCommon activity_common.FollowRepo
siteInfoService siteinfo_common.SiteInfoCommonService
tagRepo tagcommonser.TagRepo
tagCommonService *tagcommonser.TagCommonService
revisionService *revision_common.RevisionService
followCommon activity_common.FollowRepo
siteInfoService siteinfo_common.SiteInfoCommonService
activityQueueService activity_queue.ActivityQueueService
}
// NewTagService new tag service
@ -38,13 +39,16 @@ func NewTagService(
tagCommonService *tagcommonser.TagCommonService,
revisionService *revision_common.RevisionService,
followCommon activity_common.FollowRepo,
siteInfoService siteinfo_common.SiteInfoCommonService) *TagService {
siteInfoService siteinfo_common.SiteInfoCommonService,
activityQueueService activity_queue.ActivityQueueService,
) *TagService {
return &TagService{
tagRepo: tagRepo,
tagCommonService: tagCommonService,
revisionService: revisionService,
followCommon: followCommon,
siteInfoService: siteInfoService,
tagRepo: tagRepo,
tagCommonService: tagCommonService,
revisionService: revisionService,
followCommon: followCommon,
siteInfoService: siteInfoService,
activityQueueService: activityQueueService,
}
}
@ -73,7 +77,7 @@ func (ts *TagService) RemoveTag(ctx context.Context, req *schema.RemoveTagReq) (
if err != nil {
return err
}
activity_queue.AddActivity(&schema.ActivityMsg{
ts.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: req.TagID,
OriginalObjectID: req.TagID,
@ -298,7 +302,7 @@ func (ts *TagService) UpdateTagSynonym(ctx context.Context, req *schema.UpdateTa
if err != nil {
return err
}
activity_queue.AddActivity(&schema.ActivityMsg{
ts.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: tag.ID,
OriginalObjectID: tag.ID,

View File

@ -57,11 +57,12 @@ type TagRelRepo interface {
// TagCommonService user service
type TagCommonService struct {
revisionService *revision_common.RevisionService
tagCommonRepo TagCommonRepo
tagRelRepo TagRelRepo
tagRepo TagRepo
siteInfoService siteinfo_common.SiteInfoCommonService
revisionService *revision_common.RevisionService
tagCommonRepo TagCommonRepo
tagRelRepo TagRelRepo
tagRepo TagRepo
siteInfoService siteinfo_common.SiteInfoCommonService
activityQueueService activity_queue.ActivityQueueService
}
// NewTagCommonService new tag service
@ -71,13 +72,15 @@ func NewTagCommonService(
tagRepo TagRepo,
revisionService *revision_common.RevisionService,
siteInfoService siteinfo_common.SiteInfoCommonService,
activityQueueService activity_queue.ActivityQueueService,
) *TagCommonService {
return &TagCommonService{
tagCommonRepo: tagCommonRepo,
tagRelRepo: tagRelRepo,
tagRepo: tagRepo,
revisionService: revisionService,
siteInfoService: siteInfoService,
tagCommonRepo: tagCommonRepo,
tagRelRepo: tagRelRepo,
tagRepo: tagRepo,
revisionService: revisionService,
siteInfoService: siteInfoService,
activityQueueService: activityQueueService,
}
}
@ -645,7 +648,7 @@ func (ts *TagCommonService) ObjectChangeTag(ctx context.Context, objectTagData *
if err != nil {
return err
}
activity_queue.AddActivity(&schema.ActivityMsg{
ts.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: objectTagData.UserID,
ObjectID: tag.ID,
OriginalObjectID: tag.ID,
@ -845,7 +848,7 @@ func (ts *TagCommonService) UpdateTag(ctx context.Context, req *schema.UpdateTag
return err
}
if canUpdate {
activity_queue.AddActivity(&schema.ActivityMsg{
ts.activityQueueService.Send(ctx, &schema.ActivityMsg{
UserID: req.UserID,
ObjectID: tagInfo.ID,
OriginalObjectID: tagInfo.ID,