refactor(notification): make notification queue as a service

This commit is contained in:
LinkinStars 2023-06-02 18:15:54 +08:00
parent 1c2151d710
commit 773d2142db
12 changed files with 228 additions and 174 deletions

View File

@ -56,6 +56,7 @@ import (
export2 "github.com/answerdev/answer/internal/service/export" export2 "github.com/answerdev/answer/internal/service/export"
"github.com/answerdev/answer/internal/service/follow" "github.com/answerdev/answer/internal/service/follow"
meta2 "github.com/answerdev/answer/internal/service/meta" meta2 "github.com/answerdev/answer/internal/service/meta"
"github.com/answerdev/answer/internal/service/notice_queue"
notification2 "github.com/answerdev/answer/internal/service/notification" notification2 "github.com/answerdev/answer/internal/service/notification"
"github.com/answerdev/answer/internal/service/notification_common" "github.com/answerdev/answer/internal/service/notification_common"
"github.com/answerdev/answer/internal/service/object_info" "github.com/answerdev/answer/internal/service/object_info"
@ -142,7 +143,8 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, tagRelRepo, tagRepo, revisionService, siteInfoCommonService) tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, tagRelRepo, tagRepo, revisionService, siteInfoCommonService)
objService := object_info.NewObjService(answerRepo, questionRepo, commentCommonRepo, tagCommonRepo, tagCommonService) objService := object_info.NewObjService(answerRepo, questionRepo, commentCommonRepo, tagCommonRepo, tagCommonService)
voteRepo := activity_common.NewVoteRepo(dataData, activityRepo) voteRepo := activity_common.NewVoteRepo(dataData, activityRepo)
commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo) notificationQueueService := notice_queue.NewNotificationQueueService()
commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, notificationQueueService)
rolePowerRelRepo := role.NewRolePowerRelRepo(dataData) rolePowerRelRepo := role.NewRolePowerRelRepo(dataData)
rolePowerRelService := role2.NewRolePowerRelService(rolePowerRelRepo, userRoleRelService) rolePowerRelService := role2.NewRolePowerRelService(rolePowerRelRepo, userRoleRelService)
rankService := rank2.NewRankService(userCommon, userRankRepo, objService, userRoleRelService, rolePowerRelService, configService) rankService := rank2.NewRankService(userCommon, userRankRepo, objService, userRoleRelService, rolePowerRelService, configService)
@ -150,7 +152,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
reportRepo := report.NewReportRepo(dataData, uniqueIDRepo) reportRepo := report.NewReportRepo(dataData, uniqueIDRepo)
reportService := report2.NewReportService(reportRepo, objService) reportService := report2.NewReportService(reportRepo, objService)
reportController := controller.NewReportController(reportService, rankService) reportController := controller.NewReportController(reportService, rankService)
serviceVoteRepo := activity.NewVoteRepo(dataData, uniqueIDRepo, configService, activityRepo, userRankRepo, voteRepo) serviceVoteRepo := activity.NewVoteRepo(dataData, uniqueIDRepo, configService, activityRepo, userRankRepo, voteRepo, notificationQueueService)
voteService := service.NewVoteService(serviceVoteRepo, uniqueIDRepo, configService, questionRepo, answerRepo, commentCommonRepo, objService) voteService := service.NewVoteService(serviceVoteRepo, uniqueIDRepo, configService, questionRepo, answerRepo, commentCommonRepo, objService)
voteController := controller.NewVoteController(voteService, rankService) voteController := controller.NewVoteController(voteService, rankService)
followRepo := activity_common.NewFollowRepo(dataData, uniqueIDRepo, activityRepo) followRepo := activity_common.NewFollowRepo(dataData, uniqueIDRepo, activityRepo)
@ -168,11 +170,11 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
questionCommon := questioncommon.NewQuestionCommon(questionRepo, answerRepo, voteRepo, followRepo, tagCommonService, userCommon, collectionCommon, answerCommon, metaService, configService) questionCommon := questioncommon.NewQuestionCommon(questionRepo, answerRepo, voteRepo, followRepo, tagCommonService, userCommon, collectionCommon, answerCommon, metaService, configService)
collectionService := service.NewCollectionService(collectionRepo, collectionGroupRepo, questionCommon) collectionService := service.NewCollectionService(collectionRepo, collectionGroupRepo, questionCommon)
collectionController := controller.NewCollectionController(collectionService) collectionController := controller.NewCollectionController(collectionService)
answerActivityRepo := activity.NewAnswerActivityRepo(dataData, activityRepo, userRankRepo) answerActivityRepo := activity.NewAnswerActivityRepo(dataData, activityRepo, userRankRepo, notificationQueueService)
questionActivityRepo := activity.NewQuestionActivityRepo(dataData, activityRepo, userRankRepo) questionActivityRepo := activity.NewQuestionActivityRepo(dataData, activityRepo, userRankRepo)
answerActivityService := activity2.NewAnswerActivityService(answerActivityRepo, questionActivityRepo) answerActivityService := activity2.NewAnswerActivityService(answerActivityRepo, questionActivityRepo)
questionService := service.NewQuestionService(questionRepo, tagCommonService, questionCommon, userCommon, userRepo, revisionService, metaService, collectionCommon, answerActivityService, dataData, emailService) questionService := service.NewQuestionService(questionRepo, tagCommonService, questionCommon, userCommon, userRepo, revisionService, metaService, collectionCommon, answerActivityService, dataData, emailService, notificationQueueService)
answerService := service.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService) answerService := service.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService, notificationQueueService)
questionController := controller.NewQuestionController(questionService, answerService, rankService, siteInfoCommonService) questionController := controller.NewQuestionController(questionService, answerService, rankService, siteInfoCommonService)
dashboardService := dashboard.NewDashboardService(questionRepo, answerRepo, commentCommonRepo, voteRepo, userRepo, reportRepo, configService, siteInfoCommonService, serviceConf, dataData) dashboardService := dashboard.NewDashboardService(questionRepo, answerRepo, commentCommonRepo, voteRepo, userRepo, reportRepo, configService, siteInfoCommonService, serviceConf, dataData)
answerController := controller.NewAnswerController(answerService, rankService, dashboardService) answerController := controller.NewAnswerController(answerService, rankService, dashboardService)
@ -180,10 +182,10 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
searchRepo := search_common.NewSearchRepo(dataData, uniqueIDRepo, userCommon) searchRepo := search_common.NewSearchRepo(dataData, uniqueIDRepo, userCommon)
searchService := service.NewSearchService(searchParser, searchRepo) searchService := service.NewSearchService(searchParser, searchRepo)
searchController := controller.NewSearchController(searchService) searchController := controller.NewSearchController(searchService)
serviceRevisionService := service.NewRevisionService(revisionRepo, userCommon, questionCommon, answerService, objService, questionRepo, answerRepo, tagRepo, tagCommonService) serviceRevisionService := service.NewRevisionService(revisionRepo, userCommon, questionCommon, answerService, objService, questionRepo, answerRepo, tagRepo, tagCommonService, notificationQueueService)
revisionController := controller.NewRevisionController(serviceRevisionService, rankService) revisionController := controller.NewRevisionController(serviceRevisionService, rankService)
rankController := controller.NewRankController(rankService) rankController := controller.NewRankController(rankService)
reportHandle := report_handle_admin.NewReportHandle(questionCommon, commentRepo, configService) reportHandle := report_handle_admin.NewReportHandle(questionCommon, commentRepo, configService, notificationQueueService)
reportAdminService := report_admin.NewReportAdminService(reportRepo, userCommon, answerRepo, questionRepo, commentCommonRepo, reportHandle, configService, objService) reportAdminService := report_admin.NewReportAdminService(reportRepo, userCommon, answerRepo, questionRepo, commentCommonRepo, reportHandle, configService, objService)
controller_adminReportController := controller_admin.NewReportController(reportAdminService) controller_adminReportController := controller_admin.NewReportController(reportAdminService)
userAdminRepo := user.NewUserAdminRepo(dataData, authRepo) userAdminRepo := user.NewUserAdminRepo(dataData, authRepo)
@ -197,7 +199,7 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database,
siteInfoController := controller_admin.NewSiteInfoController(siteInfoService) siteInfoController := controller_admin.NewSiteInfoController(siteInfoService)
controllerSiteInfoController := controller.NewSiteInfoController(siteInfoCommonService) controllerSiteInfoController := controller.NewSiteInfoController(siteInfoCommonService)
notificationRepo := notification.NewNotificationRepo(dataData) notificationRepo := notification.NewNotificationRepo(dataData)
notificationCommon := notificationcommon.NewNotificationCommon(dataData, notificationRepo, userCommon, activityRepo, followRepo, objService) notificationCommon := notificationcommon.NewNotificationCommon(dataData, notificationRepo, userCommon, activityRepo, followRepo, objService, notificationQueueService)
notificationService := notification2.NewNotificationService(dataData, notificationRepo, notificationCommon, revisionService) notificationService := notification2.NewNotificationService(dataData, notificationRepo, notificationCommon, revisionService)
notificationController := controller.NewNotificationController(notificationService, rankService) notificationController := controller.NewNotificationController(notificationService, rankService)
dashboardController := controller.NewDashboardController(dashboardService) dashboardController := controller.NewDashboardController(dashboardService)

View File

@ -19,32 +19,30 @@ import (
"xorm.io/xorm" "xorm.io/xorm"
) )
var (
acceptActionList = []string{constant.ActAccept, constant.ActAccepted}
)
// AnswerActivityRepo answer accepted // AnswerActivityRepo answer accepted
type AnswerActivityRepo struct { type AnswerActivityRepo struct {
data *data.Data data *data.Data
activityRepo activity_common.ActivityRepo activityRepo activity_common.ActivityRepo
userRankRepo rank.UserRankRepo userRankRepo rank.UserRankRepo
notificationQueueService notice_queue.NotificationQueueService
} }
const (
acceptAction = "accept"
acceptedAction = "accepted"
)
var (
acceptActionList = []string{acceptAction, acceptedAction}
)
// NewAnswerActivityRepo new repository // NewAnswerActivityRepo new repository
func NewAnswerActivityRepo( func NewAnswerActivityRepo(
data *data.Data, data *data.Data,
activityRepo activity_common.ActivityRepo, activityRepo activity_common.ActivityRepo,
userRankRepo rank.UserRankRepo, userRankRepo rank.UserRankRepo,
notificationQueueService notice_queue.NotificationQueueService,
) activity.AnswerActivityRepo { ) activity.AnswerActivityRepo {
return &AnswerActivityRepo{ return &AnswerActivityRepo{
data: data, data: data,
activityRepo: activityRepo, activityRepo: activityRepo,
userRankRepo: userRankRepo, userRankRepo: userRankRepo,
notificationQueueService: notificationQueueService,
} }
} }
@ -139,7 +137,7 @@ func (ar *AnswerActivityRepo) AcceptAnswer(ctx context.Context,
Rank: deltaRank, Rank: deltaRank,
HasRank: hasRank, HasRank: hasRank,
} }
if action == acceptAction { if action == constant.ActAccept {
addActivity.UserID = questionUserID addActivity.UserID = questionUserID
addActivity.TriggerUserID = converter.StringToInt64(answerUserID) addActivity.TriggerUserID = converter.StringToInt64(answerUserID)
addActivity.OriginalObjectID = questionObjID // if activity is 'accept' means this question is accept the answer. addActivity.OriginalObjectID = questionObjID // if activity is 'accept' means this question is accept the answer.
@ -209,7 +207,7 @@ func (ar *AnswerActivityRepo) AcceptAnswer(ctx context.Context,
msg.ObjectType = constant.AnswerObjectType msg.ObjectType = constant.AnswerObjectType
} }
if msg.TriggerUserID != msg.ReceiverUserID { if msg.TriggerUserID != msg.ReceiverUserID {
notice_queue.AddNotification(msg) ar.notificationQueueService.Send(ctx, msg)
} }
} }
@ -223,7 +221,7 @@ func (ar *AnswerActivityRepo) AcceptAnswer(ctx context.Context,
msg.TriggerUserID = questionUserID msg.TriggerUserID = questionUserID
msg.ObjectType = constant.AnswerObjectType msg.ObjectType = constant.AnswerObjectType
msg.NotificationAction = constant.NotificationAcceptAnswer msg.NotificationAction = constant.NotificationAcceptAnswer
notice_queue.AddNotification(msg) ar.notificationQueueService.Send(ctx, msg)
} }
} }
return err return err
@ -246,7 +244,7 @@ func (ar *AnswerActivityRepo) CancelAcceptAnswer(ctx context.Context,
Rank: -deltaRank, Rank: -deltaRank,
HasRank: hasRank, HasRank: hasRank,
} }
if action == acceptAction { if action == constant.ActAccept {
addActivity.UserID = questionUserID addActivity.UserID = questionUserID
addActivity.OriginalObjectID = questionObjID addActivity.OriginalObjectID = questionObjID
} else { } else {
@ -303,7 +301,7 @@ func (ar *AnswerActivityRepo) CancelAcceptAnswer(ctx context.Context,
msg.ObjectType = constant.AnswerObjectType msg.ObjectType = constant.AnswerObjectType
} }
if msg.TriggerUserID != msg.ReceiverUserID { if msg.TriggerUserID != msg.ReceiverUserID {
notice_queue.AddNotification(msg) ar.notificationQueueService.Send(ctx, msg)
} }
} }
return err return err

View File

@ -6,11 +6,11 @@ import (
"time" "time"
"github.com/answerdev/answer/internal/base/constant" "github.com/answerdev/answer/internal/base/constant"
"github.com/answerdev/answer/internal/service/notice_queue"
"github.com/answerdev/answer/pkg/converter" "github.com/answerdev/answer/pkg/converter"
"github.com/answerdev/answer/internal/base/pager" "github.com/answerdev/answer/internal/base/pager"
"github.com/answerdev/answer/internal/service/config" "github.com/answerdev/answer/internal/service/config"
"github.com/answerdev/answer/internal/service/notice_queue"
"github.com/answerdev/answer/internal/service/rank" "github.com/answerdev/answer/internal/service/rank"
"github.com/answerdev/answer/pkg/obj" "github.com/answerdev/answer/pkg/obj"
@ -36,6 +36,7 @@ type VoteRepo struct {
activityRepo activity_common.ActivityRepo activityRepo activity_common.ActivityRepo
userRankRepo rank.UserRankRepo userRankRepo rank.UserRankRepo
voteCommon activity_common.VoteRepo voteCommon activity_common.VoteRepo
notificationQueueService notice_queue.NotificationQueueService
} }
// NewVoteRepo new repository // NewVoteRepo new repository
@ -46,6 +47,7 @@ func NewVoteRepo(
activityRepo activity_common.ActivityRepo, activityRepo activity_common.ActivityRepo,
userRankRepo rank.UserRankRepo, userRankRepo rank.UserRankRepo,
voteCommon activity_common.VoteRepo, voteCommon activity_common.VoteRepo,
notificationQueueService notice_queue.NotificationQueueService,
) service.VoteRepo { ) service.VoteRepo {
return &VoteRepo{ return &VoteRepo{
data: data, data: data,
@ -54,6 +56,7 @@ func NewVoteRepo(
activityRepo: activityRepo, activityRepo: activityRepo,
userRankRepo: userRankRepo, userRankRepo: userRankRepo,
voteCommon: voteCommon, voteCommon: voteCommon,
notificationQueueService: notificationQueueService,
} }
} }
@ -177,7 +180,7 @@ func (vr *VoteRepo) vote(ctx context.Context, objectID string, userID, objectUse
vr.sendNotification(ctx, activityUserID, objectUserID, objectID) vr.sendNotification(ctx, activityUserID, objectUserID, objectID)
} }
if sendInboxNotification { if sendInboxNotification {
vr.sendVoteInboxNotification(userID, objectUserID, objectID, upVote) vr.sendVoteInboxNotification(ctx, userID, objectUserID, objectID, upVote)
} }
return return
} }
@ -451,10 +454,10 @@ func (vr *VoteRepo) sendNotification(ctx context.Context, activityUserID, object
ObjectID: objectID, ObjectID: objectID,
ObjectType: objectType, ObjectType: objectType,
} }
notice_queue.AddNotification(msg) vr.notificationQueueService.Send(ctx, msg)
} }
func (vr *VoteRepo) sendVoteInboxNotification(triggerUserID, receiverUserID, objectID string, upvote bool) { func (vr *VoteRepo) sendVoteInboxNotification(ctx context.Context, triggerUserID, receiverUserID, objectID string, upvote bool) {
if triggerUserID == receiverUserID { if triggerUserID == receiverUserID {
return return
} }
@ -487,6 +490,6 @@ func (vr *VoteRepo) sendVoteInboxNotification(triggerUserID, receiverUserID, obj
} }
} }
if len(msg.NotificationAction) > 0 { if len(msg.NotificationAction) > 0 {
notice_queue.AddNotification(msg) vr.notificationQueueService.Send(ctx, msg)
} }
} }

View File

@ -63,6 +63,8 @@ type NotificationMsg struct {
NotificationAction string NotificationAction string
// if true no need to send notification to all followers // if true no need to send notification to all followers
NoNeedPushAllFollow bool NoNeedPushAllFollow bool
// extra info
ExtraInfo map[string]string
} }
type ObjectInfo struct { type ObjectInfo struct {

View File

@ -43,6 +43,7 @@ type AnswerService struct {
voteRepo activity_common.VoteRepo voteRepo activity_common.VoteRepo
emailService *export.EmailService emailService *export.EmailService
roleService *role.UserRoleRelService roleService *role.UserRoleRelService
notificationQueueService notice_queue.NotificationQueueService
} }
func NewAnswerService( func NewAnswerService(
@ -58,6 +59,7 @@ func NewAnswerService(
voteRepo activity_common.VoteRepo, voteRepo activity_common.VoteRepo,
emailService *export.EmailService, emailService *export.EmailService,
roleService *role.UserRoleRelService, roleService *role.UserRoleRelService,
notificationQueueService notice_queue.NotificationQueueService,
) *AnswerService { ) *AnswerService {
return &AnswerService{ return &AnswerService{
answerRepo: answerRepo, answerRepo: answerRepo,
@ -72,6 +74,7 @@ func NewAnswerService(
voteRepo: voteRepo, voteRepo: voteRepo,
emailService: emailService, emailService: emailService,
roleService: roleService, roleService: roleService,
notificationQueueService: notificationQueueService,
} }
} }
@ -481,7 +484,7 @@ func (as *AnswerService) AdminSetAnswerStatus(ctx context.Context, req *schema.A
msg.TriggerUserID = answerInfo.UserID msg.TriggerUserID = answerInfo.UserID
msg.ObjectType = constant.AnswerObjectType msg.ObjectType = constant.AnswerObjectType
msg.NotificationAction = constant.NotificationYourAnswerWasDeleted msg.NotificationAction = constant.NotificationYourAnswerWasDeleted
notice_queue.AddNotification(msg) as.notificationQueueService.Send(ctx, msg)
return nil return nil
} }
@ -571,7 +574,7 @@ func (as *AnswerService) notificationUpdateAnswer(ctx context.Context, questionU
} }
msg.ObjectType = constant.AnswerObjectType msg.ObjectType = constant.AnswerObjectType
msg.NotificationAction = constant.NotificationUpdateAnswer msg.NotificationAction = constant.NotificationUpdateAnswer
notice_queue.AddNotification(msg) as.notificationQueueService.Send(ctx, msg)
} }
func (as *AnswerService) notificationAnswerTheQuestion(ctx context.Context, func (as *AnswerService) notificationAnswerTheQuestion(ctx context.Context,
@ -588,7 +591,7 @@ func (as *AnswerService) notificationAnswerTheQuestion(ctx context.Context,
} }
msg.ObjectType = constant.AnswerObjectType msg.ObjectType = constant.AnswerObjectType
msg.NotificationAction = constant.NotificationAnswerTheQuestion msg.NotificationAction = constant.NotificationAnswerTheQuestion
notice_queue.AddNotification(msg) as.notificationQueueService.Send(ctx, msg)
userInfo, exist, err := as.userRepo.GetByUserID(ctx, questionUserID) userInfo, exist, err := as.userRepo.GetByUserID(ctx, questionUserID)
if err != nil { if err != nil {

View File

@ -65,6 +65,7 @@ type CommentService struct {
objectInfoService *object_info.ObjService objectInfoService *object_info.ObjService
emailService *export.EmailService emailService *export.EmailService
userRepo usercommon.UserRepo userRepo usercommon.UserRepo
notificationQueueService notice_queue.NotificationQueueService
} }
// NewCommentService new comment service // NewCommentService new comment service
@ -76,6 +77,7 @@ func NewCommentService(
voteCommon activity_common.VoteRepo, voteCommon activity_common.VoteRepo,
emailService *export.EmailService, emailService *export.EmailService,
userRepo usercommon.UserRepo, userRepo usercommon.UserRepo,
notificationQueueService notice_queue.NotificationQueueService,
) *CommentService { ) *CommentService {
return &CommentService{ return &CommentService{
commentRepo: commentRepo, commentRepo: commentRepo,
@ -85,6 +87,7 @@ func NewCommentService(
objectInfoService: objectInfoService, objectInfoService: objectInfoService,
emailService: emailService, emailService: emailService,
userRepo: userRepo, userRepo: userRepo,
notificationQueueService: notificationQueueService,
} }
} }
@ -476,7 +479,7 @@ func (cs *CommentService) notificationQuestionComment(ctx context.Context, quest
} }
msg.ObjectType = constant.CommentObjectType msg.ObjectType = constant.CommentObjectType
msg.NotificationAction = constant.NotificationCommentQuestion msg.NotificationAction = constant.NotificationCommentQuestion
notice_queue.AddNotification(msg) cs.notificationQueueService.Send(ctx, msg)
receiverUserInfo, exist, err := cs.userRepo.GetByUserID(ctx, questionUserID) receiverUserInfo, exist, err := cs.userRepo.GetByUserID(ctx, questionUserID)
if err != nil { if err != nil {
@ -535,7 +538,7 @@ func (cs *CommentService) notificationAnswerComment(ctx context.Context,
} }
msg.ObjectType = constant.CommentObjectType msg.ObjectType = constant.CommentObjectType
msg.NotificationAction = constant.NotificationCommentAnswer msg.NotificationAction = constant.NotificationCommentAnswer
notice_queue.AddNotification(msg) cs.notificationQueueService.Send(ctx, msg)
receiverUserInfo, exist, err := cs.userRepo.GetByUserID(ctx, answerUserID) receiverUserInfo, exist, err := cs.userRepo.GetByUserID(ctx, answerUserID)
if err != nil { if err != nil {
@ -591,7 +594,7 @@ func (cs *CommentService) notificationCommentReply(ctx context.Context, replyUse
} }
msg.ObjectType = constant.CommentObjectType msg.ObjectType = constant.CommentObjectType
msg.NotificationAction = constant.NotificationReplyToYou msg.NotificationAction = constant.NotificationReplyToYou
notice_queue.AddNotification(msg) cs.notificationQueueService.Send(ctx, msg)
} }
func (cs *CommentService) notificationMention( func (cs *CommentService) notificationMention(
@ -612,7 +615,7 @@ func (cs *CommentService) notificationMention(
} }
msg.ObjectType = constant.CommentObjectType msg.ObjectType = constant.CommentObjectType
msg.NotificationAction = constant.NotificationMentionYou msg.NotificationAction = constant.NotificationMentionYou
notice_queue.AddNotification(msg) cs.notificationQueueService.Send(ctx, msg)
alreadyNotifiedUserIDs = append(alreadyNotifiedUserIDs, userInfo.ID) alreadyNotifiedUserIDs = append(alreadyNotifiedUserIDs, userInfo.ID)
} }
} }

View File

@ -1,13 +1,53 @@
package notice_queue package notice_queue
import ( import (
"context"
"github.com/answerdev/answer/internal/schema" "github.com/answerdev/answer/internal/schema"
"github.com/segmentfault/pacman/log"
) )
var ( type NotificationQueueService interface {
NotificationQueue = make(chan *schema.NotificationMsg, 128) Send(ctx context.Context, msg *schema.NotificationMsg)
) RegisterHandler(handler func(ctx context.Context, msg *schema.NotificationMsg) error)
}
func AddNotification(msg *schema.NotificationMsg) {
NotificationQueue <- msg type notificationQueueService struct {
Queue chan *schema.NotificationMsg
Handler func(ctx context.Context, msg *schema.NotificationMsg) error
}
func (ns *notificationQueueService) Send(ctx context.Context, msg *schema.NotificationMsg) {
ns.Queue <- msg
}
func (ns *notificationQueueService) RegisterHandler(
handler func(ctx context.Context, msg *schema.NotificationMsg) error) {
ns.Handler = handler
}
func (ns *notificationQueueService) working() {
go func() {
for msg := range ns.Queue {
log.Debugf("received notification %+v", msg)
if ns.Handler == nil {
log.Warnf("no handler for notification")
continue
}
if err := ns.Handler(context.Background(), msg); err != nil {
log.Error(err)
}
}
}()
}
// NewNotificationQueueService create a new notification queue service
func NewNotificationQueueService() NotificationQueueService {
ns := &notificationQueueService{}
ns.Queue = make(chan *schema.NotificationMsg, 128)
ns.working()
return ns
}
func AddNotification2(msg *schema.NotificationMsg) {
} }

View File

@ -39,6 +39,7 @@ type NotificationCommon struct {
followRepo activity_common.FollowRepo followRepo activity_common.FollowRepo
userCommon *usercommon.UserCommon userCommon *usercommon.UserCommon
objectInfoService *object_info.ObjService objectInfoService *object_info.ObjService
notificationQueueService notice_queue.NotificationQueueService
} }
func NewNotificationCommon( func NewNotificationCommon(
@ -48,6 +49,7 @@ func NewNotificationCommon(
activityRepo activity_common.ActivityRepo, activityRepo activity_common.ActivityRepo,
followRepo activity_common.FollowRepo, followRepo activity_common.FollowRepo,
objectInfoService *object_info.ObjService, objectInfoService *object_info.ObjService,
notificationQueueService notice_queue.NotificationQueueService,
) *NotificationCommon { ) *NotificationCommon {
notification := &NotificationCommon{ notification := &NotificationCommon{
data: data, data: data,
@ -56,23 +58,12 @@ func NewNotificationCommon(
followRepo: followRepo, followRepo: followRepo,
userCommon: userCommon, userCommon: userCommon,
objectInfoService: objectInfoService, objectInfoService: objectInfoService,
notificationQueueService: notificationQueueService,
} }
notification.HandleNotification() notificationQueueService.RegisterHandler(notification.AddNotification)
return notification return notification
} }
func (ns *NotificationCommon) HandleNotification() {
go func() {
for msg := range notice_queue.NotificationQueue {
log.Debugf("received notification %+v", msg)
err := ns.AddNotification(context.TODO(), msg)
if err != nil {
log.Error(err)
}
}
}()
}
// AddNotification // AddNotification
// need set // need set
// LoginUserID // LoginUserID
@ -213,6 +204,6 @@ func (ns *NotificationCommon) SendNotificationToAllFollower(ctx context.Context,
t.ReceiverUserID = userID t.ReceiverUserID = userID
t.TriggerUserID = msg.TriggerUserID t.TriggerUserID = msg.TriggerUserID
t.NoNeedPushAllFollow = true t.NoNeedPushAllFollow = true
notice_queue.AddNotification(t) ns.notificationQueueService.Send(ctx, t)
} }
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/answerdev/answer/internal/service/export" "github.com/answerdev/answer/internal/service/export"
"github.com/answerdev/answer/internal/service/follow" "github.com/answerdev/answer/internal/service/follow"
"github.com/answerdev/answer/internal/service/meta" "github.com/answerdev/answer/internal/service/meta"
"github.com/answerdev/answer/internal/service/notice_queue"
"github.com/answerdev/answer/internal/service/notification" "github.com/answerdev/answer/internal/service/notification"
notficationcommon "github.com/answerdev/answer/internal/service/notification_common" notficationcommon "github.com/answerdev/answer/internal/service/notification_common"
"github.com/answerdev/answer/internal/service/object_info" "github.com/answerdev/answer/internal/service/object_info"
@ -86,4 +87,5 @@ var ProviderSetService = wire.NewSet(
user_external_login.NewUserCenterLoginService, user_external_login.NewUserCenterLoginService,
plugin_common.NewPluginCommonService, plugin_common.NewPluginCommonService,
config.NewConfigService, config.NewConfigService,
notice_queue.NewNotificationQueueService,
) )

View File

@ -53,6 +53,7 @@ type QuestionService struct {
answerActivityService *activity.AnswerActivityService answerActivityService *activity.AnswerActivityService
data *data.Data data *data.Data
emailService *export.EmailService emailService *export.EmailService
notificationQueueService notice_queue.NotificationQueueService
} }
func NewQuestionService( func NewQuestionService(
@ -67,6 +68,7 @@ func NewQuestionService(
answerActivityService *activity.AnswerActivityService, answerActivityService *activity.AnswerActivityService,
data *data.Data, data *data.Data,
emailService *export.EmailService, emailService *export.EmailService,
notificationQueueService notice_queue.NotificationQueueService,
) *QuestionService { ) *QuestionService {
return &QuestionService{ return &QuestionService{
questionRepo: questionRepo, questionRepo: questionRepo,
@ -80,6 +82,7 @@ func NewQuestionService(
answerActivityService: answerActivityService, answerActivityService: answerActivityService,
data: data, data: data,
emailService: emailService, emailService: emailService,
notificationQueueService: notificationQueueService,
} }
} }
@ -633,7 +636,7 @@ func (qs *QuestionService) notificationInviteUser(
} }
msg.ObjectType = constant.QuestionObjectType msg.ObjectType = constant.QuestionObjectType
msg.NotificationAction = constant.NotificationInvitedYouToAnswer msg.NotificationAction = constant.NotificationInvitedYouToAnswer
notice_queue.AddNotification(msg) qs.notificationQueueService.Send(ctx, msg)
userInfo, ok := invitee[userID] userInfo, ok := invitee[userID]
if !ok { if !ok {
@ -1247,7 +1250,7 @@ func (qs *QuestionService) AdminSetQuestionStatus(ctx context.Context, questionI
msg.TriggerUserID = questionInfo.UserID msg.TriggerUserID = questionInfo.UserID
msg.ObjectType = constant.QuestionObjectType msg.ObjectType = constant.QuestionObjectType
msg.NotificationAction = constant.NotificationYourQuestionWasDeleted msg.NotificationAction = constant.NotificationYourQuestionWasDeleted
notice_queue.AddNotification(msg) qs.notificationQueueService.Send(ctx, msg)
return nil return nil
} }

View File

@ -4,12 +4,12 @@ import (
"context" "context"
"github.com/answerdev/answer/internal/service/config" "github.com/answerdev/answer/internal/service/config"
"github.com/answerdev/answer/internal/service/notice_queue"
"github.com/answerdev/answer/internal/base/constant" "github.com/answerdev/answer/internal/base/constant"
"github.com/answerdev/answer/internal/entity" "github.com/answerdev/answer/internal/entity"
"github.com/answerdev/answer/internal/schema" "github.com/answerdev/answer/internal/schema"
"github.com/answerdev/answer/internal/service/comment" "github.com/answerdev/answer/internal/service/comment"
"github.com/answerdev/answer/internal/service/notice_queue"
questioncommon "github.com/answerdev/answer/internal/service/question_common" questioncommon "github.com/answerdev/answer/internal/service/question_common"
"github.com/answerdev/answer/pkg/obj" "github.com/answerdev/answer/pkg/obj"
) )
@ -18,16 +18,20 @@ type ReportHandle struct {
questionCommon *questioncommon.QuestionCommon questionCommon *questioncommon.QuestionCommon
commentRepo comment.CommentRepo commentRepo comment.CommentRepo
configService *config.ConfigService configService *config.ConfigService
notificationQueueService notice_queue.NotificationQueueService
} }
func NewReportHandle( func NewReportHandle(
questionCommon *questioncommon.QuestionCommon, questionCommon *questioncommon.QuestionCommon,
commentRepo comment.CommentRepo, commentRepo comment.CommentRepo,
configService *config.ConfigService) *ReportHandle { configService *config.ConfigService,
notificationQueueService notice_queue.NotificationQueueService,
) *ReportHandle {
return &ReportHandle{ return &ReportHandle{
questionCommon: questionCommon, questionCommon: questionCommon,
commentRepo: commentRepo, commentRepo: commentRepo,
configService: configService, configService: configService,
notificationQueueService: notificationQueueService,
} }
} }
@ -88,5 +92,5 @@ func (rh *ReportHandle) sendNotification(ctx context.Context, reportedUserID, ob
ObjectType: constant.ReportObjectType, ObjectType: constant.ReportObjectType,
NotificationAction: notificationAction, NotificationAction: notificationAction,
} }
notice_queue.AddNotification(msg) rh.notificationQueueService.Send(ctx, msg)
} }

View File

@ -37,6 +37,7 @@ type RevisionService struct {
answerRepo answercommon.AnswerRepo answerRepo answercommon.AnswerRepo
tagRepo tag_common.TagRepo tagRepo tag_common.TagRepo
tagCommon *tagcommon.TagCommonService tagCommon *tagcommon.TagCommonService
notificationQueueService notice_queue.NotificationQueueService
} }
func NewRevisionService( func NewRevisionService(
@ -49,6 +50,7 @@ func NewRevisionService(
answerRepo answercommon.AnswerRepo, answerRepo answercommon.AnswerRepo,
tagRepo tag_common.TagRepo, tagRepo tag_common.TagRepo,
tagCommon *tagcommon.TagCommonService, tagCommon *tagcommon.TagCommonService,
notificationQueueService notice_queue.NotificationQueueService,
) *RevisionService { ) *RevisionService {
return &RevisionService{ return &RevisionService{
revisionRepo: revisionRepo, revisionRepo: revisionRepo,
@ -60,6 +62,7 @@ func NewRevisionService(
answerRepo: answerRepo, answerRepo: answerRepo,
tagRepo: tagRepo, tagRepo: tagRepo,
tagCommon: tagCommon, tagCommon: tagCommon,
notificationQueueService: notificationQueueService,
} }
} }
@ -210,7 +213,7 @@ func (rs *RevisionService) revisionAuditAnswer(ctx context.Context, revisionitem
} }
msg.ObjectType = constant.AnswerObjectType msg.ObjectType = constant.AnswerObjectType
msg.NotificationAction = constant.NotificationUpdateAnswer msg.NotificationAction = constant.NotificationUpdateAnswer
notice_queue.AddNotification(msg) rs.notificationQueueService.Send(ctx, msg)
activity_queue.AddActivity(&schema.ActivityMsg{ activity_queue.AddActivity(&schema.ActivityMsg{
UserID: revisionitem.UserID, UserID: revisionitem.UserID,