123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- package ws
- import (
- "encoding/base64"
- "encoding/json"
- "gadmin/config"
- "gadmin/utility/token"
- jsoniter "github.com/json-iterator/go"
- "net/http"
- "sync"
- "time"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "github.com/sirupsen/logrus"
- "github.com/spf13/cast"
- )
- type Msg struct {
- Type int64 `json:"type"` // 1 日志 2 用户查询
- Id int64 `json:"id"`
- Code int `json:"code"`
- Msg string `json:"msg"`
- CID string `json:"cid"` // clientId
- Extra map[string]interface{} `json:"extra,omitempty"`
- }
- func (msg *Msg) Bytes() []byte {
- data, _ := json.Marshal(msg)
- return data
- }
- type ReqMsg struct {
- Type int `json:"type"`
- Data string `json:"data"` //
- }
- type Client struct {
- Id int64
- c *websocket.Conn
- msgChan chan Msg
- msgReq chan ReqMsg
- lastHeart int64
- isBroken bool
- claims *token.UserClaims
- Lock sync.Mutex
- }
- func NewClient(id int64, conn *websocket.Conn) *Client {
- cli := &Client{
- Id: id,
- c: conn,
- msgChan: make(chan Msg, 100),
- msgReq: make(chan ReqMsg, 100),
- lastHeart: time.Now().Unix(),
- }
- return cli
- }
- func (client *Client) Handler() {
- go func() {
- for !client.isBroken {
- _, data, err := client.c.ReadMessage()
- if err != nil {
- logrus.Errorln("接收消息:", err.Error())
- client.isBroken = true
- break
- }
- var msg ReqMsg
- //logrus.Warn("接收到原始消息:", string(data))
- if err = json.Unmarshal(data, &msg); err != nil {
- logrus.Warn("Handler json.Unmarshal err:", err.Error())
- break
- }
- client.msgReq <- msg
- }
- }()
- for !client.isBroken {
- select {
- case msg, ok := <-client.msgReq: // 接收
- if ok {
- client.HandlerReqMsg(msg)
- }
- case msg, ok := <-client.msgChan: // 发送
- if ok {
- err := client.WriteResp(msg.Bytes())
- if err != nil {
- logrus.Error("写入:", err.Error())
- client.isBroken = true
- }
- }
- }
- }
- }
- var clients = make(map[int64]*Client)
- var lock sync.Mutex
- var BoardCast = make(chan Msg, 100) // 广播通道
- func NotifyBoardCast() {
- for {
- select {
- case msg, ok := <-BoardCast:
- logrus.Infof("new NotifyBoardCast :%+v", msg)
- if ok {
- for _, client := range clients {
- logrus.Info("client ", client.Id, " ,conn:", client.isBroken)
- if client.isBroken {
- client.Lock.Lock()
- client.c.Close()
- close(client.msgChan)
- close(client.msgReq)
- client.Lock.Unlock()
- lock.Lock()
- delete(clients, client.Id)
- lock.Unlock()
- continue
- }
- if msg.CID != "" {
- if client.Id == cast.ToInt64(msg.CID) {
- logrus.Infoln("NotifyBoardCast ", msg)
- client.msgChan <- msg
- break
- }
- } else {
- client.msgChan <- msg
- }
- }
- }
- default:
- time.Sleep(1 * time.Second)
- }
- }
- }
- var upgrader = websocket.Upgrader{
- // 解决跨域问题
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- } // use default options
- func Websocket(ctx *gin.Context) {
- c, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
- if err != nil {
- logrus.Error("upgrade:", err)
- return
- }
- //t := token.GetAuthorization(ctx)
- //if t == "" {
- // logrus.Error("没有登录")
- // //return
- //}
- //claims, err := token.ParseToken(t)
- //if err != nil {
- // logrus.Error("token.ParseToken:", err.Error())
- // //return
- //}
- //if claims == nil {
- // claims = &token.UserClaims{}
- //}
- encodeToken := token.GetAuthorization(ctx)
- if encodeToken == "" {
- logrus.Error("没有登录")
- //return
- }
- bytesT, err := base64.URLEncoding.DecodeString(encodeToken)
- if err != nil {
- logrus.Error("base64.URLEncoding.DecodeString:", err.Error())
- }
- t := string(bytesT)
- tokenKey := config.GetTokenKey(t)
- if config.TokenRedis.Exists(tokenKey).Val() == 0 {
- logrus.Error("没有登录")
- //return
- }
- userStr := config.TokenRedis.Get(tokenKey).Val()
- claims := new(token.UserClaims)
- if err := jsoniter.UnmarshalFromString(userStr, claims); err != nil {
- logrus.Error("jsoniter.UnmarshalFromString:", err.Error())
- //return
- }
- if claims.ID == 0 {
- logrus.Error("没有登录")
- }
- now := time.Now().UnixNano() / 10e3
- client := &Client{
- Id: now,
- c: c,
- msgChan: make(chan Msg, 100),
- msgReq: make(chan ReqMsg, 100),
- lastHeart: time.Now().Unix(),
- claims: claims,
- }
- lock.Lock()
- clients[now] = client
- lock.Unlock()
- logrus.Infof("新用户接入ws, sid:%v, ip:%v, id:%v, userName:%v", client.Id, c.RemoteAddr(), client.claims.ID, client.claims.UserName)
- defer c.Close()
- client.Handler()
- }
- // GetClientsByAdminId 通过管理员ID查找在线的ws
- func GetClientsByAdminId(adminId int64) (checkedClients []*Client) {
- lock.Lock()
- defer lock.Unlock()
- for _, client := range clients {
- if client.claims.ID == adminId {
- checkedClients = append(checkedClients, client)
- }
- }
- return checkedClients
- }
- // SendToAdmin 发送消息给指定管理员
- func SendToAdmin(adminId int64, msg Msg) {
- checkedClients := GetClientsByAdminId(adminId)
- if len(checkedClients) == 0 {
- logrus.Warnf("当前管理员没有在线的ws admin_id:%v", adminId)
- return
- }
- for _, client := range checkedClients {
- logrus.Warnf("SendToAdmin id:%v, client.claims:%+v", client.Id, client.claims)
- client.WriteResp(msg.Bytes())
- }
- }
|