golang使用sse事件流调用AI大模型

慈云数据 8个月前 (04-11) 技术支持 138 0

目录

  • 前言
  • 第一步 解决没有官方SDK的痛
  • 第二步 实现流式传输
    • 什么是SSE,SSE和WebSocket的区别
    • 基于gin实现SSE服务器
    • gin接收AI大模型数据流响应
      • 1. 前端携带自定义问题请求后端接口
      • 2. 后端接受请求解析问题,然后创建stream对象
      • 3. 构建请求参数,调用创建数据流客户端接口
      • 4. 调用http客户端,发起请求,将响应结果封装在数据流对象里面
      • 5. 获取到数据流,然后就监听返回的数据流
      • 6. 具体接受数据流的方法
      • 7. 就是把解析到的数据流放入通道,C.Stream监听通道获取流,使用C.sseevent返回前端
      • 8. over,你就可以看到效果了。
      • 如有不足,请指正

        前言

        本次Ai大模型,我们选择清华大学出品的智谱AI大模型,为什么呢?因为chatgpt已经有开源第三方库,可以直接调,但是要科学上网(借助代理也是可是实现的)。只要是公司业务有这方面的需求。。。

        第一步 解决没有官方SDK的痛

        接口文档

        在这里插入图片描述

        没办法,照着python,java比葫芦画瓢,自己造。

        在这里插入图片描述

        我们来到非SDK用户这边,文档说先获取APIkey,然后根据APIkey生成JWTtoken,把token加到请求头,然后使用http请求就可以了。以下是golang代码:

        // 这边我已经封装成了方法,只需要传入apikey,token过期时间
        token, err := go_ZhiPuAI.GenerateJwtToken(global.GvaConfig.ZhiPuAI.ApiKey, global.GvaConfig.ZhiPuAI.ExpSeconds) 
        //这是具体代码
        func GenerateJwtToken(apiKey string, expSeconds int) (string, error) {
        	// 分割apiKey以获取id和secret
        	parts := strings.Split(apiKey, ".")
        	if len(parts) != 2 {
        		return "", fmt.Errorf("invalid apiKey: %v", parts)
        	}
        	id, secret := parts[0], parts[1]
        	// 创建JWT的payload
        	claims := jwt.MapClaims{
        		"api_key":   id,
        		"exp":       time.Now().Unix()*1000 + int64(expSeconds)*1000,
        		"timestamp": time.Now().Unix() * 1000,
        	}
        	// 创建一个新的Token对象,并指定签名算法和claims
        	token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
        	// 添加headers
        	token.Header["alg"] = "HS256"
        	token.Header["sign_type"] = "SIGN"
        	// 使用secret对token进行签名
        	tokenString, err := token.SignedString([]byte(secret))
        	if err != nil {
        		return "", err
        	}
        	return tokenString, nil
        }
        

        将鉴权 token 放入 HTTP 请求的 header 中

        用户需要将生成的鉴权 token 放入 HTTP 的 Authorization header 头中:

        Authorization: 鉴权token

        Example:curl请求中的token参数示例
        curl --location 'https://open.bigmodel.cn/api/paas/v4/chat/completions' \
        --header 'Authorization: Bearer ' \
        --header 'Content-Type: application/json' \
        --data '{
            "model": "glm-4",
            "messages": [
                {
                    "role": "user",
                    "content": "你好"
                }
            ]
        }'
        

        这样我们就可以访问了。

        第二步 实现流式传输

        这是python的代码示例,我们只需在请求字段里加上”stream=True“即可开启流式传输

        from zhipuai import ZhipuAI
        client = ZhipuAI(api_key="") # 请填写您自己的APIKey
        response = client.chat.completions.create(
            model="glm-4",  # 填写需要调用的模型名称
            messages=[
                {"role": "system", "content": "你是一个乐于解答各种问题的助手,你的任务是为用户提供专业、准确、有见地的建议。"},
                {"role": "user", "content": "我对太阳系的行星非常感兴趣,特别是土星。请提供关于土星的基本信息,包括其大小、组成、环系统和任何独特的天文现象。"},
            ],
            stream=True,
        )
        for chunk in response:
            print(chunk.choices[0].delta)
        

        因为python官方库已经实现好了,只需一个for循环,golang又要自己动手实现,淦。。

        什么是SSE,SSE和WebSocket的区别

        WebSocket:一种双向通信协议,同时支持服务端和客户端之间的实时交互。WebSocket 是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。

        SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术。服务端可以使用 SSE

        来向客户端推送数据,但客户端不能通过SSE向服务端发送数据。相较于 WebSocket,SSE 更简单、更轻量级,但只能实现单向通信。

        SSE(Server-Sent Events)和 WebSocket 都是用于实现服务器与客户端之间实时双向通信的技术。虽然它们都可以用于实时更新数据,但它们在实现方式、特点和适用场景上有着明显的区别。

        两者的主要区别:

        SSEWebSocket
        通信单向通信双向通信
        协议HTTPWebSocket
        自动重连支持不支持,需要客服端自行支持
        数据格式文本格式二进制数据、文本格式
        跨域不支持(若跨域需配置指定的Access-Control-Allow-Origin)支持
        适用场景SSE 适用于需要服务器向客户端单向实时推送数据的场景,例如实时更新的新闻、股票行情等。优点:简单易用,对服务器压力小,浏览器兼容性好。缺点:只支持单向通信,无法进行双向交互。适用于需要客户端和服务器之间实时双向通信的场景,例如聊天室、实时协作应用等。优点:支持双向通信,实时性更高,可以实现更丰富的交互效果。缺点:需要独立的 TCP 连接,对服务器压力更大,浏览器兼容性相对较差。

        基于gin实现SSE服务器

        //前端代码
        
        
        
            SSE test
            
                // 向后端服务器发起sse请求
                const es = new EventSource("http://127.0.0.1:9000/v1/VoiceoverScript/chat");
                // 监听事件流
                es.onmessage = function (e) {
                    document.getElementById("test")
                        .insertAdjacentHTML("beforeend", "
      • " + e.data + "
      • "); console.log(e); } // 监听”chat“事件流 es.addEventListener("chat", (e) => { document.getElementById("test") .insertAdjacentHTML("beforeend", "" + e.data + ""); console.log(e) }); es.onerror = function (e) { // readyState说明 // 0:浏览器与服务端尚未建立连接或连接已被关闭 // 1:浏览器与服务端已成功连接,浏览器正在处理接收到的事件及数据 // 2:浏览器与服务端建立连接失败,客户端不再继续建立与服务端之间的连接 console.log("readyState = " + e.currentTarget.readyState); }

        SSE test

        //后端代码
        //注意 **我注释的代码,是不使用gin框架封装的Stream方法,也就是C.Stream(func())和C.ssevent(),只是C.Stream要改成for循环持续的从通道里面进行读,直到通道关闭,结束for循环**
        package main
        import (
        	"fmt"
        	"github.com/gin-gonic/gin"
        	"io"
        	"testing"
        	"time"
        )
        func SSE(c *gin.Context) {
        	// 设置响应头,告诉前端适用event-stream事件流交互
        	//c.Writer.Header().Set("Content-Type", "text/event-stream")
        	//c.Writer.Header().Set("Cache-Control", "no-cache")
        	//c.Writer.Header().Set("Connection", "keep-alive")
        	// 判断是否支持sse
        	//w := c.Writer
        	//flusher, _ := w.(http.Flusher)
        	// 接收前端页面关闭连接通知
        	closeNotify := c.Request.Context().Done()
        	// 开启协程监听前端页面是否关闭了连接,关闭连接会触发此方法
        	go func() {
        		
        		// 记得关闭通道
        		defer close(Chan)
        		// 模拟gpt回复
        		s := `在远古时代的一个神秘而神奇的大陆上,有着一座被人们称为“永恒之城”的城市。这座城市建立在一座巍峨的山脉之中,被壮丽的自然景观所环绕。`
        		//
        		for _, char := range s {
        			Chan 
        		i := 
        	engine := gin.Default()
        	// 设置跨域中间件
        	engine.Use(func(context *gin.Context) {
        		origin := context.GetHeader("Origin")
        		// 允许 Origin 字段中的域发送请求
        		context.Writer.Header().Add("Access-Control-Allow-Origin", origin) // 这边我的前端页面在63342,会涉及跨域,这个根据自己情况设置,或者直接设置为”*“,放行所有的
        		// 设置预验请求有效期为 86400 秒
        		context.Writer.Header().Set("Access-Control-Max-Age", "86400")
        		// 设置允许请求的方法
        		context.Writer.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE, PATCH")
        		// 设置允许请求的 Header
        		context.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Apitoken")
        		// 设置拿到除基本字段外的其他字段,如上面的Apitoken, 这里通过引用Access-Control-Expose-Headers,进行配置,效果是一样的。
        		context.Writer.Header().Set("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Headers")
        		// 配置是否可以带认证信息
        		context.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
        		// OPTIONS请求返回200
        		if context.Request.Method == "OPTIONS" {
        			fmt.Println(context.Request.Header)
        			context.AbortWithStatus(200)
        		} else {
        			context.Next()
        		}
        	})
        	engine.GET("/v1/VoiceoverScript/chat", SSE) // 记得适用get请求,我用post前端报404,资料说是SSE只支持get请求
        	engine.Run(":9000")
        }
        
        	// 需要登录
        	tokenGroup := engine.Group("/v1/VoiceoverScript").Use(middleware.JWTAuthMiddleware())
        	{
        		tokenGroup.GET("/chat", v1.StartChat)
        	}
        }
        
        		response.ResponseError(c, response.CodeInvalidToken)
        	}
        	// 获取Ai大模型数据流
        	stream, err := ai.ProcessService(c, &request.Request{Prompt: question}, int64(u.UserID))
        	if err != nil {
        		global.GvaLogger.Sugar().Error("aiClient Process bizError: %v", err)
        		response.ResponseErrorWithMsg(c, response.ServerError, "错误:"+err.Error())
        		return
        	}
        	defer stream.Close()
        	....
        
        	//TODO 根据用户id更新减少用户的使用次数
        	chatRequest := go_ZhiPuAI.ChatCompletionRequest{
        		Model: "glm-4",
        		Messages: []go_ZhiPuAI.ChatCompletionMessage{
        			{
        				Role:    go_ZhiPuAI.ChatMessageRoleAssistant,
        				Content: "你是一个聪明且富有创造力的世界上最通情达理的人"},
        			{
        				Role:    go_ZhiPuAI.ChatMessageRoleUser,
        				Content: r.Prompt,
        			},
        		},
        		Stream: true,
        	}
        	token, err := go_ZhiPuAI.GenerateJwtToken(global.GvaConfig.ZhiPuAI.ApiKey, global.GvaConfig.ZhiPuAI.ExpSeconds)
        	if err != nil {
        		return nil, err
        	}
        	client := go_ZhiPuAI.NewClientWithConfig(
        		go_ZhiPuAI.ClientConfig{
        			AuthToken:  token,
        			BaseURL:    global.GvaConfig.ZhiPuAI.BaseUrl,
        			HTTPClient: &http.Client{},
        		},
        	)
        	if client == nil {
        		global.GvaLogger.Sugar().Error("智普客户端初始化失败!")
        		return nil, errors.New("key失效")
        	}
        	// 主要是这里
        	// 携带自定义请求参数请求然后返回数据流对象
        	completionStream, err := client.CreateChatCompletionStream(ctx, chatRequest)
        	if err != nil {
        		global.GvaLogger.Sugar().Error("aiClient client.CreateChatCompletion bizError:%v", err)
        		//goUtil.New(func() {    起一个协程把当前key的状态设为0,不可用
        		//	refreshKey(gptClient)
        		//})
        		return nil, err
        	}
        	return completionStream, nil
        }
        
        	urlSuffix := "/chat/completions"
        	//if !checkEndpointSupportsModel(urlSuffix, request.Model) {
        	//	err = ErrChatCompletionInvalidModel
        	//	return
        	//}
        	request.Stream = true
        	req, err := c.newStreamRequest(ctx, "POST", urlSuffix, request)
        	if err != nil {
        		return
        	}
        	resp, err := c.config.HTTPClient.Do(req) //nolint:bodyclose // body is closed in stream.Close()
        	if err != nil {
        		return
        	}
        	// 使用http客户端发起请求,把响应结果使用bufio.NewReader(resp.Body)存入数据流对象
        	stream = &ChatCompletionStream{
        		streamReader: &streamReader[ChatCompletionStreamResponse]{
        			emptyMessagesLimit: c.config.EmptyMessagesLimit,
        			reader:             bufio.NewReader(resp.Body),
        			response:           resp,
        			errAccumulator:     newErrorAccumulator(),
        			unmarshaler:        &jsonUnmarshaler{},
        		},
        	}
        	return
        }
        Prompt: question}, int64(u.UserID))
        	if err != nil {
        		global.GvaLogger.Sugar().Error("aiClient Process bizError: %v", err)
        		response.ResponseErrorWithMsg(c, response.ServerError, "错误:"+err.Error())
        		return
        	}
        	defer stream.Close()
        	// 通道
        	chanStream := make(chan *go_ZhiPuAI.ChatProcessResponse, 100)
        	// 异步协程
        	go func() {
        		defer stream.Close()
        		defer close(chanStream)
        		for i := 0; ; i++ {
        			streamResponse, err := stream.Recv()
        			if errors.Is(err, io.EOF) {
        				global.GvaLogger.Sugar().Debug("Stream finished")
        				chanStream 
        				global.GvaLogger.Sugar().Error("Stream error: %v\n", err)
        				chanStream 
        				global.GvaLogger.Sugar().Debug("Stream finished")
        				chanStream 
        				ID:              streamResponse.ID,
        				Role:            choice.Delta.Role,
        				Segment:         go_ZhiPuAI.SegmentText,
        				DateTime:        time.Now().Format("2006-01-02 15:04:05"),
        				Content:         choice.Delta.Content,
        				ParentMessageID: go_ZhiPuAI.AssistantMessageId,
        			}
        			if i == 0 {
        				data.Segment = go_ZhiPuAI.SegmentStart
        			}
        			if choice.FinishReason == go_ZhiPuAI.SegmentStop {
        				data.Segment = go_ZhiPuAI.SegmentStop
        			}
        			chanStream 
        				return
        			}
        		}
        	}()
        
        	if stream.isFinished {
        		err = io.EOF
        		return
        	}
        	var emptyMessagesCount uint
        waitForData:
        	line, err := stream.reader.ReadBytes('\n')
        	li, _ := stream.reader.ReadBytes('\n')
        	fmt.Println(li)
        	if err != nil {
        		respErr := stream.errAccumulator.unmarshalError()
        		if respErr != nil {
        			err = fmt.Errorf("error, %w", respErr.Error)
        		}
        		return
        	}
        	var headerData = []byte("data: ")
        	line = bytes.TrimSpace(line)
        	if !bytes.HasPrefix(line, headerData) {
        		if writeErr := stream.errAccumulator.write(line); writeErr != nil {
        			err = writeErr
        			return
        		}
        		emptyMessagesCount++
        		if emptyMessagesCount  stream.emptyMessagesLimit {
        			err = ErrTooManyEmptyStreamMessages
        			return
        		}
        		goto waitForData
        	}
        	line = bytes.TrimPrefix(line, headerData)
        	if string(line) == "[DONE]" {
        		stream.isFinished = true
        		err = io.EOF
        		return
        	}
        	err = stream.unmarshaler.unmarshal(line, &response)
        	return
        }
        
        		if msg, ok := 
        			if msg == go_ZhiPuAI.ErrorsResponse {
        				return false
        			}
        			msgList = append(msgList, msg)
        			//marshal, _ := json.Marshal(msg)
        			c.SSEvent("chat", msg.Content)
        			//flusher.Flush() //确保立即发送
        			if msg == go_ZhiPuAI.StopResponse {
        				return false
        			}
        			return true
        		}
        		return true
        	})
        	// 将会话存入数据库
        	goUtil.New(func() {
        	})
        
    微信扫一扫加客服

    微信扫一扫加客服

    点击启动AI问答
    Draggable Icon