ws.go 5.2 KB


  1. package ws
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "gadmin/config"
  6. "gadmin/utility/token"
  7. jsoniter "github.com/json-iterator/go"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "github.com/gorilla/websocket"
  13. "github.com/sirupsen/logrus"
  14. "github.com/spf13/cast"
  15. )
  16. type Msg struct {
  17. Type int64 `json:"type"` // 1 日志 2 用户查询
  18. Id int64 `json:"id"`
  19. Code int `json:"code"`
  20. Msg string `json:"msg"`
  21. CID string `json:"cid"` // clientId
  22. Extra map[string]interface{} `json:"extra,omitempty"`
  23. }
  24. func (msg *Msg) Bytes() []byte {
  25. data, _ := json.Marshal(msg)
  26. return data
  27. }
  28. type ReqMsg struct {
  29. Type int `json:"type"`
  30. Data string `json:"data"` //
  31. }
  32. type Client struct {
  33. Id int64
  34. c *websocket.Conn
  35. msgChan chan Msg
  36. msgReq chan ReqMsg
  37. lastHeart int64
  38. isBroken bool
  39. claims *token.UserClaims
  40. Lock sync.Mutex
  41. }
  42. func NewClient(id int64, conn *websocket.Conn) *Client {
  43. cli := &Client{
  44. Id: id,
  45. c: conn,
  46. msgChan: make(chan Msg, 100),
  47. msgReq: make(chan ReqMsg, 100),
  48. lastHeart: time.Now().Unix(),
  49. }
  50. return cli
  51. }
  52. func (client *Client) Handler() {
  53. go func() {
  54. for !client.isBroken {
  55. _, data, err := client.c.ReadMessage()
  56. if err != nil {
  57. logrus.Errorln("接收消息:", err.Error())
  58. client.isBroken = true
  59. break
  60. }
  61. var msg ReqMsg
  62. //logrus.Warn("接收到原始消息:", string(data))
  63. if err = json.Unmarshal(data, &msg); err != nil {
  64. logrus.Warn("Handler json.Unmarshal err:", err.Error())
  65. break
  66. }
  67. client.msgReq <- msg
  68. }
  69. }()
  70. for !client.isBroken {
  71. select {
  72. case msg, ok := <-client.msgReq: // 接收
  73. if ok {
  74. client.HandlerReqMsg(msg)
  75. }
  76. case msg, ok := <-client.msgChan: // 发送
  77. if ok {
  78. err := client.WriteResp(msg.Bytes())
  79. if err != nil {
  80. logrus.Error("写入:", err.Error())
  81. client.isBroken = true
  82. }
  83. }
  84. }
  85. }
  86. }
  87. var clients = make(map[int64]*Client)
  88. var lock sync.Mutex
  89. var BoardCast = make(chan Msg, 100) // 广播通道
  90. func NotifyBoardCast() {
  91. for {
  92. select {
  93. case msg, ok := <-BoardCast:
  94. logrus.Infof("new NotifyBoardCast :%+v", msg)
  95. if ok {
  96. for _, client := range clients {
  97. logrus.Info("client ", client.Id, " ,conn:", client.isBroken)
  98. if client.isBroken {
  99. client.Lock.Lock()
  100. client.c.Close()
  101. close(client.msgChan)
  102. close(client.msgReq)
  103. client.Lock.Unlock()
  104. lock.Lock()
  105. delete(clients, client.Id)
  106. lock.Unlock()
  107. continue
  108. }
  109. if msg.CID != "" {
  110. if client.Id == cast.ToInt64(msg.CID) {
  111. logrus.Infoln("NotifyBoardCast ", msg)
  112. client.msgChan <- msg
  113. break
  114. }
  115. } else {
  116. client.msgChan <- msg
  117. }
  118. }
  119. }
  120. default:
  121. time.Sleep(1 * time.Second)
  122. }
  123. }
  124. }
  125. var upgrader = websocket.Upgrader{
  126. // 解决跨域问题
  127. CheckOrigin: func(r *http.Request) bool {
  128. return true
  129. },
  130. } // use default options
  131. func Websocket(ctx *gin.Context) {
  132. c, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
  133. if err != nil {
  134. logrus.Error("upgrade:", err)
  135. return
  136. }
  137. //t := token.GetAuthorization(ctx)
  138. //if t == "" {
  139. // logrus.Error("没有登录")
  140. // //return
  141. //}
  142. //claims, err := token.ParseToken(t)
  143. //if err != nil {
  144. // logrus.Error("token.ParseToken:", err.Error())
  145. // //return
  146. //}
  147. //if claims == nil {
  148. // claims = &token.UserClaims{}
  149. //}
  150. encodeToken := token.GetAuthorization(ctx)
  151. if encodeToken == "" {
  152. logrus.Error("没有登录")
  153. //return
  154. }
  155. bytesT, err := base64.URLEncoding.DecodeString(encodeToken)
  156. if err != nil {
  157. logrus.Error("base64.URLEncoding.DecodeString:", err.Error())
  158. }
  159. t := string(bytesT)
  160. tokenKey := config.GetTokenKey(t)
  161. if config.TokenRedis.Exists(tokenKey).Val() == 0 {
  162. logrus.Error("没有登录")
  163. //return
  164. }
  165. userStr := config.TokenRedis.Get(tokenKey).Val()
  166. claims := new(token.UserClaims)
  167. if err := jsoniter.UnmarshalFromString(userStr, claims); err != nil {
  168. logrus.Error("jsoniter.UnmarshalFromString:", err.Error())
  169. //return
  170. }
  171. if claims.ID == 0 {
  172. logrus.Error("没有登录")
  173. }
  174. now := time.Now().UnixNano() / 10e3
  175. client := &Client{
  176. Id: now,
  177. c: c,
  178. msgChan: make(chan Msg, 100),
  179. msgReq: make(chan ReqMsg, 100),
  180. lastHeart: time.Now().Unix(),
  181. claims: claims,
  182. }
  183. lock.Lock()
  184. clients[now] = client
  185. lock.Unlock()
  186. logrus.Infof("新用户接入ws, sid:%v, ip:%v, id:%v, userName:%v", client.Id, c.RemoteAddr(), client.claims.ID, client.claims.UserName)
  187. defer c.Close()
  188. client.Handler()
  189. }
  190. // GetClientsByAdminId 通过管理员ID查找在线的ws
  191. func GetClientsByAdminId(adminId int64) (checkedClients []*Client) {
  192. lock.Lock()
  193. defer lock.Unlock()
  194. for _, client := range clients {
  195. if client.claims.ID == adminId {
  196. checkedClients = append(checkedClients, client)
  197. }
  198. }
  199. return checkedClients
  200. }
  201. // SendToAdmin 发送消息给指定管理员
  202. func SendToAdmin(adminId int64, msg Msg) {
  203. checkedClients := GetClientsByAdminId(adminId)
  204. if len(checkedClients) == 0 {
  205. logrus.Warnf("当前管理员没有在线的ws admin_id:%v", adminId)
  206. return
  207. }
  208. for _, client := range checkedClients {
  209. logrus.Warnf("SendToAdmin id:%v, client.claims:%+v", client.Id, client.claims)
  210. client.WriteResp(msg.Bytes())
  211. }
  212. }