package ws import ( "encoding/base64" "encoding/json" "gadmin/config" "gadmin/utility/token" jsoniter "github.com/json-iterator/go" "net/http" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" "github.com/spf13/cast" ) type Msg struct { Type int64 `json:"type"` // 1 日志 2 用户查询 Id int64 `json:"id"` Code int `json:"code"` Msg string `json:"msg"` CID string `json:"cid"` // clientId Extra map[string]interface{} `json:"extra,omitempty"` } func (msg *Msg) Bytes() []byte { data, _ := json.Marshal(msg) return data } type ReqMsg struct { Type int `json:"type"` Data string `json:"data"` // } type Client struct { Id int64 c *websocket.Conn msgChan chan Msg msgReq chan ReqMsg lastHeart int64 isBroken bool claims *token.UserClaims Lock sync.Mutex } func NewClient(id int64, conn *websocket.Conn) *Client { cli := &Client{ Id: id, c: conn, msgChan: make(chan Msg, 100), msgReq: make(chan ReqMsg, 100), lastHeart: time.Now().Unix(), } return cli } func (client *Client) Handler() { go func() { for !client.isBroken { _, data, err := client.c.ReadMessage() if err != nil { logrus.Errorln("接收消息:", err.Error()) client.isBroken = true break } var msg ReqMsg //logrus.Warn("接收到原始消息:", string(data)) if err = json.Unmarshal(data, &msg); err != nil { logrus.Warn("Handler json.Unmarshal err:", err.Error()) break } client.msgReq <- msg } }() for !client.isBroken { select { case msg, ok := <-client.msgReq: // 接收 if ok { client.HandlerReqMsg(msg) } case msg, ok := <-client.msgChan: // 发送 if ok { err := client.WriteResp(msg.Bytes()) if err != nil { logrus.Error("写入:", err.Error()) client.isBroken = true } } } } } var clients = make(map[int64]*Client) var lock sync.Mutex var BoardCast = make(chan Msg, 100) // 广播通道 func NotifyBoardCast() { for { select { case msg, ok := <-BoardCast: logrus.Infof("new NotifyBoardCast :%+v", msg) if ok { for _, client := range clients { logrus.Info("client ", client.Id, " ,conn:", client.isBroken) if client.isBroken { client.Lock.Lock() client.c.Close() close(client.msgChan) close(client.msgReq) client.Lock.Unlock() lock.Lock() delete(clients, client.Id) lock.Unlock() continue } if msg.CID != "" { if client.Id == cast.ToInt64(msg.CID) { logrus.Infoln("NotifyBoardCast ", msg) client.msgChan <- msg break } } else { client.msgChan <- msg } } } default: time.Sleep(1 * time.Second) } } } var upgrader = websocket.Upgrader{ // 解决跨域问题 CheckOrigin: func(r *http.Request) bool { return true }, } // use default options func Websocket(ctx *gin.Context) { c, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil) if err != nil { logrus.Error("upgrade:", err) return } //t := token.GetAuthorization(ctx) //if t == "" { // logrus.Error("没有登录") // //return //} //claims, err := token.ParseToken(t) //if err != nil { // logrus.Error("token.ParseToken:", err.Error()) // //return //} //if claims == nil { // claims = &token.UserClaims{} //} encodeToken := token.GetAuthorization(ctx) if encodeToken == "" { logrus.Error("没有登录") //return } bytesT, err := base64.URLEncoding.DecodeString(encodeToken) if err != nil { logrus.Error("base64.URLEncoding.DecodeString:", err.Error()) } t := string(bytesT) tokenKey := config.GetTokenKey(t) if config.TokenRedis.Exists(tokenKey).Val() == 0 { logrus.Error("没有登录") //return } userStr := config.TokenRedis.Get(tokenKey).Val() claims := new(token.UserClaims) if err := jsoniter.UnmarshalFromString(userStr, claims); err != nil { logrus.Error("jsoniter.UnmarshalFromString:", err.Error()) //return } if claims.ID == 0 { logrus.Error("没有登录") } now := time.Now().UnixNano() / 10e3 client := &Client{ Id: now, c: c, msgChan: make(chan Msg, 100), msgReq: make(chan ReqMsg, 100), lastHeart: time.Now().Unix(), claims: claims, } lock.Lock() clients[now] = client lock.Unlock() logrus.Infof("新用户接入ws, sid:%v, ip:%v, id:%v, userName:%v", client.Id, c.RemoteAddr(), client.claims.ID, client.claims.UserName) defer c.Close() client.Handler() } // GetClientsByAdminId 通过管理员ID查找在线的ws func GetClientsByAdminId(adminId int64) (checkedClients []*Client) { lock.Lock() defer lock.Unlock() for _, client := range clients { if client.claims.ID == adminId { checkedClients = append(checkedClients, client) } } return checkedClients } // SendToAdmin 发送消息给指定管理员 func SendToAdmin(adminId int64, msg Msg) { checkedClients := GetClientsByAdminId(adminId) if len(checkedClients) == 0 { logrus.Warnf("当前管理员没有在线的ws admin_id:%v", adminId) return } for _, client := range checkedClients { logrus.Warnf("SendToAdmin id:%v, client.claims:%+v", client.Id, client.claims) client.WriteResp(msg.Bytes()) } }