package docker import ( "context" "errors" "fmt" "gadmin/config" "gadmin/internal/admin/consts" "gadmin/internal/gorm/model" "gadmin/internal/gorm/query" "io" "net" "os" "strings" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/client" "github.com/sirupsen/logrus" ) // api文档 // https://pkg.go.dev/github.com/docker/docker/client#Client.ContainerExecStart type PushDeployStopInput struct { Ctx context.Context Deploy *model.ServerDeploy Log *model.ServerDeployLog StopType int64 } type PushDeployTaskInput struct { Ctx context.Context Deploy *model.ServerDeploy Log *model.ServerDeployLog DeployServ []string `json:"deployServ" form:"deployServ"` // 部署服务选项 } type ReadDeployLogInput struct { Ctx context.Context Deploy *model.ServerDeploy LogFile string } type GraveProcessInput struct { Ctx context.Context Deploy *model.ServerDeploy } func getDeployScript(serverType string, publishType int32) (deployScript string) { var name string switch publishType { case consts.DeployPublishTypeStop: name = "stop" default: name = "deployment" } url := os.Getenv("DEPLOY_SH_URL") if os.Getenv("ADMIN_IS_LOCAL") == "1" { deployScript = url + serverType + "_" + name + "_local.sh" return } if os.Getenv("GIN_MODE") == "release" { deployScript = url + serverType + "_" + name + "_release.sh" return } if os.Getenv("GIN_MODE") == "debug" { deployScript = url + serverType + "_" + name + "_debug.sh" return } return } func findTemplate(packs []string, startServ []string, stopServ []string) []string { if len(packs) > 0 { return packs } if len(startServ) > 0 { return startServ } if len(stopServ) > 0 { return stopServ } return nil } func ParseDeployServArgs(serverType string, deployServ []string, isStartMpRoom int32) (string, string, string) { var packs []string var startServ []string var stopServ []string // _ = consts.DeployTypeLogin: //stopServ需要consts.DeployPackChapter if containsAll(deployServ) { serverType2 := strings.ToUpper(serverType) DownloadKey := fmt.Sprintf("DEPLOY_%v_DOWNLOAD", serverType2) StartKey := fmt.Sprintf("DEPLOY_%v_START", serverType2) StopKey := fmt.Sprintf("DEPLOY_%v_STOP", serverType2) tmp := os.Getenv(DownloadKey) if len(tmp) > 0 { packs = strings.Split(tmp, ",") } tmp = os.Getenv(StartKey) if len(tmp) > 0 { startServ = strings.Split(tmp, ",") } tmp = os.Getenv(StopKey) if len(tmp) > 0 { stopServ = strings.Split(tmp, ",") } // 允许只设一个的情况,找到这一个,并给其他赋值 templateList := findTemplate(packs, startServ, stopServ) if len(templateList) == 0 { logrus.Errorln("parseDeployServArgs findTemplate found nil") return "", "", "" } if len(packs) == 0 { packs = templateList } if len(startServ) == 0 { startServ = templateList } if len(stopServ) == 0 { stopServ = templateList } if isStartMpRoom != consts.INTTRUE { tmps := make([]string, 0, len(startServ)) for _, v := range startServ { if v == consts.DeployPackMPRoom { continue } tmps = append(tmps, v) } startServ = tmps } // switch serverType { // case consts.DeployTypeLogin: // packs = strings.Split(os.Getenv("DEPLOY_LOGIN_DOWNLOAD"), ",") // if len(packs) == 0 { // packs = append(packs, consts.DeployServLoginGroup...) // } // startServ = strings.Split(os.Getenv("DEPLOY_LOGIN_START"), ",") // if len(startServ) == 0 { // startServ = packs // } // stopServ = strings.Split(os.Getenv("DEPLOY_LOGIN_STOP"), ",") // if len(stopServ) == 0 { // stopServ = packs // } // case consts.DeployTypeGrave: // packs = strings.Split(os.Getenv("DEPLOY_GRAVE_DOWNLOAD"), ",") // if len(packs) == 0 { // packs = append(packs, consts.DeployServGraveGroup...) // } // startServ = strings.Split(os.Getenv("DEPLOY_GRAVE_START"), ",") // if len(startServ) == 0 { // startServ = packs // } // stopServ = strings.Split(os.Getenv("DEPLOY_GRAVE_STOP"), ",") // if len(stopServ) == 0 { // stopServ = packs // } // if isStartMpRoom != consts.INTTRUE { // tmps := make([]string, 0, len(startServ)) // for _, v := range startServ { // if v == consts.DeployPackMPRoom { // continue // } // tmps = append(tmps, v) // } // startServ = tmps // } // // if isStartMpRoom == consts.StartDeployIsMPRoom { // // startServ = append(startServ, consts.DeployServGraveGroup...) // // } else { // // startServ = append(startServ, consts.DeployServGraveGroupNoMPRoom...) // // } // // stopServ = append(stopServ, consts.DeployServGraveGroup...) // // stopServ = append(stopServ, consts.DeployPackChapter) // // case consts.DeployTypeGate: // // startServ = append(startServ, consts.DeployServGateGroup...) // default: // // 处理未知的 serverType // logrus.Errorf("parseDeployServArgs containsAll default serverType:%+v, deployServ:%+v", serverType, deployServ) // return "", "", "" // } } else { startServ = append(startServ, deployServ...) stopServ = append(startServ, deployServ...) packs = append(packs, deployServ...) } return strings.Join(packs, ","), strings.Join(startServ, ","), strings.Join(stopServ, ",") } // // supplementGate 追加网关服务 // func supplementGate(serv []string) []string { // gates := strings.Split(os.Getenv("DEPLOY_GRAVE_GATE"), ",") // lst := make([]string, 0, len(serv)+len(gates)) // for _, v := range serv { // if v == "gate" { // for _, v2 := range gates { // if len(v2) > 0 { // lst = append(lst, v2) // } // } // } else { // lst = append(lst, v) // } // } // return lst // } func containsAll(arr []string) bool { for _, val := range arr { if val == "all" { return true } } return false } func PushDeployTask(in PushDeployTaskInput) (err error) { defer func() { var ( ql = query.Use(config.DB).ServerDeployLog status = consts.DeployStatusIng errMsg = "" ) if err != nil { status = consts.DeployStatusErr errMsg = err.Error() } _, err = query.Use(config.DB).ServerDeployLog.WithContext(in.Ctx).Where(ql.ID.Eq(in.Log.ID)).Updates(model.ServerDeployLog{ ErrorMsg: errMsg, Status: status, EndAt: time.Now(), }) if err != nil { logrus.Warningf("PushDeployTask ServerDeployLog Updates err:%+v", err) } }() packs, startServ, stopServ := ParseDeployServArgs(in.Deploy.ServerType, in.DeployServ, in.Deploy.IsMproom) if len(packs) == 0 || len(startServ) == 0 || len(stopServ) == 0 { err = errors.New("download pack is nil") logrus.Warningln("PushDeployTask ParseDeployServArgs list is nil") return } logrus.Infof("ParseDeployServArgs %+v", packs) cli, err := newStrictClient(in.Ctx, in.Deploy.DockerAddr, in.Deploy.CaPem, in.Deploy.CertPem, in.Deploy.KeyPem, in.Deploy.ContainerName) defer cli.Close() if err != nil { logrus.Warningf("PushDeployTask newStrictClient err:%+v", err) return } var ( sh = "rm -rf deployment.sh && curl %s >> deployment.sh && chmod +x deployment.sh && ./deployment.sh %s %s %s %s %s %s %s %s %s" execSh = fmt.Sprintf(sh, getDeployScript(in.Deploy.ServerType, in.Log.PublishType), //curl in.Deploy.ContainerName, //appName in.Log.Version, //version in.Log.TraceID, //traceID os.Getenv("DEPLOY_ORIGIN_URL"), //originURL os.Getenv("DEPLOY_NOTIFY_URL"), //notifyURL os.Getenv("DEPLOY_MODE"), //mode packs, //packLstStr startServ, //serverListStr stopServ, //stopServerListStr ) ) logrus.Infof("ContainerExecCreate execSh:%+v", execSh) execResp, err := cli.ContainerExecCreate(in.Ctx, in.Deploy.ContainerName, types.ExecConfig{ AttachStdin: true, AttachStdout: true, Cmd: []string{"sh", "-c", execSh}, }, ) logrus.Infof("ContainerExecCreate execResp:%+v", execResp) resp, err := cli.ContainerExecAttach(in.Ctx, execResp.ID, types.ExecStartCheck{ Detach: false, Tty: false, }, ) if err != nil { logrus.Warningf("ContainerExecAttach err:%+v", err) return } defer resp.Close() return } func PushDeployStop(in PushDeployStopInput) (err error) { defer func() { var ( ql = query.Use(config.DB).ServerDeployLog status = consts.DeployStatusIng errMsg = "" ) if err != nil { status = consts.DeployStatusErr errMsg = err.Error() } _, err = query.Use(config.DB).ServerDeployLog.WithContext(in.Ctx).Where(ql.DeployID.Eq(in.Log.ID)).Updates(model.ServerDeployLog{ ErrorMsg: errMsg, Status: status, EndAt: time.Now(), }) if err != nil { logrus.Warningf("PushDeployStop ServerDeployLog Updates err:%+v", err) } }() cli, err := newStrictClient(in.Ctx, in.Deploy.DockerAddr, in.Deploy.CaPem, in.Deploy.CertPem, in.Deploy.KeyPem, in.Deploy.ContainerName) defer cli.Close() if err != nil { logrus.Warningf("PushDeployStop newStrictClient err:%+v", err) return } stopCmd := fmt.Sprintf("rm -rf stop.sh && curl %s >> stop.sh && chmod +x stop.sh && ./stop.sh %s %v %s %s", getDeployScript(in.Deploy.ServerType, in.Log.PublishType), in.Deploy.ContainerName, in.StopType, in.Log.TraceID, os.Getenv("DEPLOY_NOTIFY_URL"), ) logrus.Infof("PushDeployStop stopCmd:%+v", stopCmd) execResp, err := cli.ContainerExecCreate(in.Ctx, in.Deploy.ContainerName, types.ExecConfig{ AttachStdin: true, AttachStdout: true, Cmd: []string{"sh", "-c", stopCmd}, }, ) if err != nil { logrus.Errorf("PushDeployStop ContainerExecCreate err:%+v", err) return } logrus.Infof("PushDeployStop ContainerExecCreate execResp:%+v", execResp) resp, err := cli.ContainerExecAttach(in.Ctx, execResp.ID, types.ExecStartCheck{ Detach: false, Tty: false, }, ) if err != nil { logrus.Errorf("PushDeployStop ContainerExecAttach err:%+v", err) return } defer resp.Close() return } func GetContainer(ctx context.Context, dockerAddr, caPem, certPem, keyPem, containerName string) (cr *types.Container, err error) { key := config.UKey("GetContainer", dockerAddr) if foo, found := config.Cache.Get(key); found { containers, ok := foo.([]types.Container) if !ok { err = errors.New("GetContainer 缓存数据断言失败!") } for _, container := range containers { if container.Names[0] == "/"+containerName { cr = &container break } } return } var cli client.APIClient if caPem == "" { cli, err = newAPIClient(dockerAddr) if err != nil { logrus.Warningf("newAPIClient err:%+v", err) return } } else { cli, err = newTLSAPIClient(dockerAddr, caPem, certPem, keyPem) if err != nil { logrus.Warningf("newTLSAPIClient err:%+v", err) return } } defer cli.Close() containers, err := cli.ContainerList(ctx, types.ContainerListOptions{All: true}) if err != nil { return } for _, container := range containers { if container.Names[0] == "/"+containerName { cr = &container break } } // 只保证在遍历列表时够用就行 config.Cache.Set(key, containers, time.Second*3) return } func GetContainerList(ctx context.Context, dockerAddr, caPem, certPem, keyPem string) (containers []types.Container, err error) { var cli client.APIClient if caPem == "" { cli, err = newAPIClient(dockerAddr) if err != nil { logrus.Warningf("newAPIClient err:%+v", err) return } } else { cli, err = newTLSAPIClient(dockerAddr, caPem, certPem, keyPem) if err != nil { logrus.Warningf("newTLSAPIClient err:%+v", err) return } } defer cli.Close() containers, err = cli.ContainerList(ctx, types.ContainerListOptions{All: true}) if err != nil { return } return } func newStrictClient(ctx context.Context, dockerAddr, caPem, certPem, keyPem, containerName string) (cli client.APIClient, err error) { if caPem == "" { cli, err = newAPIClient(dockerAddr) if err != nil { logrus.Warningf("newAPIClient err:%+v", err) return } } else { cli, err = newTLSAPIClient(dockerAddr, caPem, certPem, keyPem) if err != nil { logrus.Warningf("newTLSAPIClient err:%+v", err) return } } containers, err := cli.ContainerList(ctx, types.ContainerListOptions{All: true}) if err != nil { return } verify := false for _, container := range containers { if container.Names[0] == "/"+containerName { verify = true if "running" != container.State { err = errors.New(fmt.Sprintf("容器没有运行,当前状态:%v", container.State)) return } break } } if !verify { err = errors.New(fmt.Sprintf("docker中没有找到容器:%v", containerName)) return } return } func ReadDeployLog(in ReadDeployLogInput) (b []byte, err error) { if in.LogFile == "" { err = errors.New("没有找到日志路径") return } cli, err := newStrictClient(in.Ctx, in.Deploy.DockerAddr, in.Deploy.CaPem, in.Deploy.CertPem, in.Deploy.KeyPem, in.Deploy.ContainerName) defer cli.Close() if err != nil { logrus.Warningf("ReadDeployLog newStrictClient err:%+v", err) return } var sh = `cat %s` execResp, err := cli.ContainerExecCreate(in.Ctx, in.Deploy.ContainerName, types.ExecConfig{ AttachStdin: true, AttachStdout: true, Cmd: []string{"sh", "-c", fmt.Sprintf(sh, in.LogFile)}, }, ) logrus.Infof("readDeploymentLog ContainerExecCreate execResp:%+v", execResp) resp, err := cli.ContainerExecAttach(in.Ctx, execResp.ID, types.ExecStartCheck{ Detach: false, Tty: false, }, ) if err != nil { logrus.Warningf("readDeploymentLog ContainerExecAttach err:%+v", err) return } defer resp.Close() return io.ReadAll(resp.Reader) } // 获取容器内运行的进程 func GetGraveProcess(in GraveProcessInput) (b []byte, err error) { logrus.Infof("GetGraveProcess Start...") cli, err := newStrictClient(in.Ctx, in.Deploy.DockerAddr, in.Deploy.CaPem, in.Deploy.CertPem, in.Deploy.KeyPem, in.Deploy.ContainerName) defer cli.Close() if err != nil { logrus.Warningf("GetGraveProcess newStrictClient err:%+v", err) return } cmd := fmt.Sprintf("ps -aux | grep %s | grep -v grep | grep -v deployment", in.Deploy.ContainerName) execConfig := types.ExecConfig{ Cmd: []string{"sh", "-c", cmd}, AttachStdout: true, AttachStderr: true, } execRes, err := cli.ContainerExecCreate(in.Ctx, in.Deploy.ContainerName, execConfig) if err != nil { logrus.WithField("method", "GetGraveProcess").Errorf("ContainerExecCreate Err:%+v", execRes) return nil, err } execStartCheck := types.ExecStartCheck{} hr, err := cli.ContainerExecAttach(in.Ctx, execRes.ID, execStartCheck) if err != nil { logrus.WithField("method", "GetGraveProcess").Errorf("ContainerExecAttach Err: %+v", err) return } defer hr.Close() //输出 data, err := io.ReadAll(hr.Reader) if err != nil { logrus.WithField("method", "GetGraveProcess").Errorf("io.ReadAll Err: %+v", err) return } logrus.Infof("GetGraveProcess end.") return data, nil } func RetryGraveProcess(in GraveProcessInput, maxRetry, retry int) (b []byte, err error) { cli, err := newStrictClient(in.Ctx, in.Deploy.DockerAddr, in.Deploy.CaPem, in.Deploy.CertPem, in.Deploy.KeyPem, in.Deploy.ContainerName) if err != nil { logrus.Warningf("GetGraveProcess newStrictClient err:%+v", err) if maxRetry > 0 && retry < maxRetry { time.Sleep(time.Second * 2) retry++ return RetryGraveProcess(in, maxRetry, retry) } return } var sh = `ps -aux | grep grave | grep -v grep | grep -v deployment` execResp, err := cli.ContainerExecCreate(in.Ctx, in.Deploy.ContainerName, types.ExecConfig{ AttachStdin: true, AttachStdout: true, Cmd: []string{"sh", "-c", sh}, }, ) logrus.Infof("GetGraveProcess ContainerExecCreate execResp:%+v", execResp) resp, err := cli.ContainerExecAttach(in.Ctx, execResp.ID, types.ExecStartCheck{ Detach: false, Tty: false, }, ) if err != nil { logrus.Warningf("GetGraveProcess ContainerExecAttach err:%+v", err) if maxRetry > 0 && retry < maxRetry { time.Sleep(time.Second * 2) retry++ return RetryGraveProcess(in, maxRetry, retry) } return } defer resp.Close() defer cli.Close() return io.ReadAll(resp.Reader) } func AuthZPluginTLS() { const ( testDaemonHTTPSAddr = "tcp://119.91.150.156:13375" cacertPath = "/root/certs/119.91.150.156/pem/ca.pem" //serverCertPath = "../../testdata/https/server-cert.pem" //serverKeyPath = "../../testdata/https/server-key.pem" clientCertPath = "/root/certs/119.91.150.156/pem/client-cert.pem" clientKeyPath = "/root/certs/119.91.150.156/pem/client-key.pem" ) ctx := context.Background() c, err := newTLSAPIClient(testDaemonHTTPSAddr, cacertPath, clientCertPath, clientKeyPath) defer c.Close() if err != nil { logrus.Warningf("newTLSAPIClient err:%+v", err) return } list, err := c.ContainerList(context.Background(), types.ContainerListOptions{}) if err != nil { logrus.Warningf("ContainerList err:%+v", err) return } for i, container := range list { logrus.Warningf("ContainerList i:%v container:%+v \n\n", i, container) } //listImages(c) // vers, err := c.ServerVersion(ctx) if err != nil { logrus.Warningf("ServerVersion err:%+v", err) return } logrus.Warningf("ServerVersion vers:%+v", vers) //var sh = "rm -rf deployment.sh && curl http://mysql.facms.cn/sh/deployment.sh >> deployment.sh && chmod +x deployment.sh && ./deployment.sh %s %s %s" //var traceID = character.Md5Content([]byte(character.RandStringRunes(32))) // //execResp, err := c.ContainerExecCreate(ctx, "gadmin", // types.ExecConfig{ // AttachStdin: true, // AttachStdout: true, // Cmd: []string{"sh", "-c", fmt.Sprintf(sh, "test123", "v1.6.2", traceID)}, // }, //) // //logrus.Infof("ContainerExecCreate execResp:%+v", execResp) // //resp, err := c.ContainerExecAttach(ctx, execResp.ID, // types.ExecStartCheck{ // Detach: false, // Tty: false, // }, //) //if err != nil { // logrus.Warningf("ContainerExecAttach err:%+v", err) // return //} // //defer resp.Close() //r, err := io.ReadAll(resp.Reader) //logrus.Warningf("ContainerExecAttach2 resCh got:%+v", string(r)) //var ( // waitCh = make(chan struct{}) // resCh = make(chan struct { // content string // err error // }, 1) //) // //go func() { // close(waitCh) // defer close(resCh) // r, err := io.ReadAll(resp.Reader) // // resCh <- struct { // content string // err error // }{ // content: string(r), // err: err, // } //}() // //<-waitCh //select { //case <-time.After(3 * time.Second): // logrus.Fatal("failed to read the content in time") //case got := <-resCh: // // logrus.Warningf("ContainerExecAttach resCh got:%+v", got) // //assert.NilError(t, got.err) // // // //// NOTE: using Contains because no-tty's stream contains UX information // //// like size, stream type. // //assert.Assert(t, is.Contains(got.content, expected)) //} } func newTLSAPIClient(host, cacertPath, certPath, keyPath string) (client.APIClient, error) { dialer := &net.Dialer{ KeepAlive: 30 * time.Second, Timeout: 30 * time.Second, } return client.NewClientWithOpts( client.WithTLSClientConfig(cacertPath, certPath, keyPath), client.WithDialContext(dialer.DialContext), client.WithHost(host), client.WithAPIVersionNegotiation(), ) } func newAPIClient(host string) (client.APIClient, error) { dialer := &net.Dialer{ KeepAlive: 30 * time.Second, Timeout: 30 * time.Second, } return client.NewClientWithOpts( client.WithDialContext(dialer.DialContext), client.WithHost(host), client.WithAPIVersionNegotiation(), ) }