# TypeScript跑通第一个MCP,使用Deepseek查询sqlite数据库 **Published by:** [LinSoap](https://paragraph.com/@linsoap/) **Published on:** 2025-04-03 **URL:** https://paragraph.com/@linsoap/typescript-mcp-deepseek-sqlite ## Content 最近最火的AI技术应该就是MCP了,还不了解MCP概念的建议查看Model Context Protocol文档,其中详细介绍了MCP的概念和规范。本文使用官方的TypeScript SDK,请求到DeepSeek,从服务端到客户端简单实现一个Sqlite MCP,顺利开始MCP开发。初始化项目本项目所有资源都在这个git仓库TypeScript-MCP-Sqlite-Quickstart,包括了案例使用到的sqlite文件,你可以直接拉取,或者按照步骤初始化,自己创建一个数据库。 创建项目创建文件夹 mkdir TypeScript-MCP-Sqlite-Quickstart cd TypeScript-MCP-Sqlite-Quickstart 初始化npm npm init -y 安装相关依赖 npm install @modelcontextprotocol/sdk zod sqlite3 express @types/express openai npm install -D @types/node typescript touch index.ts touch server/sqlite_stdio.ts touch server/sqlite_sse.ts 配置package.json和tsconfig.json package.json{ "name": "typescript-mcp-sqlite-quickstart", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "build": "tsc && chmod 755 build/index.js" }, "keywords": [], "author": "", "license": "ISC", "type": "module", "dependencies": { "@modelcontextprotocol/sdk": "^1.8.0", "@types/express": "^5.0.1", "express": "^5.1.0", "openai": "^4.91.1", "sqlite3": "^5.1.7", "zod": "^3.24.2" }, "devDependencies": { "@types/node": "^22.13.17", "typescript": "^5.8.2" } } tsconfig.json{ "compilerOptions": { "target": "ES2022", "module": "Node16", "moduleResolution": "Node16", "outDir": "./build", "rootDir": "./", "strict": true, "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true }, "include": ["index.ts","server/**/*"], "exclude": ["node_modules"] } 编写MCP ServerMCP Server有两种启动方式,一种是Stdio,一种是SSE,在本节都会进行介绍。首先介绍Stdio方式。Stdio Transport在./server/sqlite_stdio.ts文件中添加import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import sqlite3 from "sqlite3"; import { promisify } from "util"; import { z } from "zod"; //创建一个MCPServer实例 export const sqliteServer = new McpServer({ name: "SQLite Explorer", version: "1.0.0" }); //初始化Sqlite const getDb = () => { const db = new sqlite3.Database("database.db"); return { all: promisify<string, any[]>(db.all.bind(db)), close: promisify(db.close.bind(db)) }; }; //定义一个server //第一个参数是tools名 //第二个参数是描述,向大模型介绍该工具的用途 //第三个参数是输入参数 //第四个参数是调用的方法 sqliteServer.tool( "query", "这是用于执行 SQLite 查询的工具,你可以使用它来执行任何有效的 SQL 查询,例如SELECT sql FROM sqlite_master WHERE type='table',或者SELECT * FROM table_name", { sql: z.string() }, async ({ sql }) => { const db = getDb(); try { const results = await db.all(sql); return { content: [{ type: "text", text: JSON.stringify(results, null, 2) }] }; } catch (err: unknown) { const error = err as Error; return { content: [{ type: "text", text: `Error: ${error.message}` }], isError: true }; } finally { await db.close(); } } ); //使用StdioTransport连接Server const transport = new StdioServerTransport(); await sqliteServer.connect(transport); 使用build编译ts文件,inspector是官方提供的一个MCP server检查工具,使用inspector检查server服务是否正常。//编译ts npm run build //运行检查工具 npx @modelcontextprotocol/inspector Stdio运行结果SSE Transport在./server/sqlite_sse.ts中添加import express, { Request, Response } from "express"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import sqlite3 from "sqlite3"; import { promisify } from "util"; import { z } from "zod"; //初始化一个MCPServer实例 const sqliteServer = new McpServer({ name: "SQLite Explorer", version: "1.0.0" }); //初始化Sqlite const getDb = () => { const db = new sqlite3.Database("database.db"); return { all: promisify<string, any[]>(db.all.bind(db)), close: promisify(db.close.bind(db)) }; }; const app = express(); //定义一个server,该部分与stdio相同 sqliteServer.tool( "query", "这是用于执行 SQLite 查询的工具,你可以使用它来执行任何有效的 SQL 查询,例如SELECT sql FROM sqlite_master WHERE type='table',或者SELECT * FROM table_name", { sql: z.string() }, async ({ sql }) => { const db = getDb(); try { const results = await db.all(sql); return { content: [{ type: "text", text: JSON.stringify(results, null, 2) }] }; } catch (err: unknown) { const error = err as Error; return { content: [{ type: "text", text: `Error: ${error.message}` }], isError: true }; } finally { await db.close(); } } ); //使用SSETransport连接Server const transports: {[sessionId: string]: SSEServerTransport} = {}; //用于处理SSE连接的路由 app.get("/sse", async (_: Request, res: Response) => { const transport = new SSEServerTransport('/messages', res); transports[transport.sessionId] = transport; res.on("close", () => { delete transports[transport.sessionId]; }); await sqliteServer.connect(transport); }); //用于处理多个连接的路由 app.post("/messages", async (req: Request, res: Response) => { const sessionId = req.query.sessionId as string; const transport = transports[sessionId]; if (transport) { await transport.handlePostMessage(req, res); } else { res.status(400).send('No transport found for sessionId'); } }); app.listen(3001); 完成server编写后,同样要进行编译,还需要运行express服务,然后使用Inspector连接测试功能是否正常。//编译ts npm run build //运行express node ./build/server/sqlite_sse.js 打开Inspector,选择transport type为sse,设置url为http://localhost:3001/sse,点击连接,测试工具。SSE测试结果MCP客户端编写MCP客户端同样支持两种transport,一种是stdio,一种是sse,与服务端相对应。 在index.ts中添加import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; //使用StdioTransport连接Server //这里填写的参数与Inspector测试中填写的参数一致 const stdioTransport = new StdioClientTransport({ command: "node", args: ["./build/server/sqlite_stdio.js"] }); //使用SSETransport连接Server //这里填写的参数与Inspector测试中填写的参数一致 //const sseTransport = new SSEClientTransport(new URL("http://localhost:3001/sse")); const client = new Client( { name: "example-client", version: "1.0.0" }, { capabilities: { prompts: {}, resources: {}, tools: {} } } ); //连接StdioTransport和SSETransport,不过貌似只能同时连接一个, //按需选择连接方式 await client.connect(stdioTransport); // await client.connect(sseTransport); //获取工具列表 const tools = await client.listTools(); console.log("Tools:", tools); //获取资源列表 // const resources = await client.listResources(); //获取提示列表 // const prompts = await client.listPrompts(); 编写完MCP客户端后,编译ts,运行MCP客户端,如果列出可用工具列表,则说明客户端运行正常。//编译ts npm run build //运行客户端 node build/index.js MCP 客户端查询Tools结果向Deepseek发送请求调用MCPdeepseek可以使用openai sdk发送请求,在发送请求的时候在参数中带上tools即可。 由于openai的Tools参数定义和anthropic的tools参数定义不同,在获取到tools后按需进行转换。 转换函数const toolsResult = await client.listTools(); //anthropic格式的tools const anthropicTools = toolsResult.tools.map((tool) => { return { name: tool.name, description: tool.description, input_schema: tool.inputSchema, }; }); //openai格式的tools const openaiTools = toolsResult.tools.map((tool) => { return { "type": "function", "function": { "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema, } } }) 至此,已经成功获取tools信息,只需要在请求deepseek的时候带上tools参数,然后监听response中的finish_reson,如果为tools_calls,则获取response中的toll_calls中的name和args。然后调用MCP客户端的callTool方法,将获取到的结果重新发送给Deepseek,Deepseek就成功获取到了数据库中的信息。const openai = new OpenAI({ apiKey: 'apikey', // 使用你的apikey,建议从环境变量读取 baseURL: 'https://api.deepseek.com/v1', }); //替换成你的问题 const messages: ChatCompletionMessageParam[] = [{ role: "user", content: "我有一个sqlite数据库,数据库中有expenses表和incomes表,告诉我我的收入和支出分别是多少?" }]; // 发送第一次请求,带上tools参数 const response = await openai.chat.completions.create({ model: "deepseek-chat", messages: messages, max_tokens: 1000, tools: openaiTools, } ); const content = response.choices[0]; //监听finish_reason,如果是tool_calls,说明大模型调用了工具 console.log("finish_resaon",content.finish_reason); if (content.finish_reason === "tool_calls" && content.message.tool_calls && content.message.tool_calls.length > 0) { //获取大模型返回工具调用的参数 const tool_call = content.message.tool_calls[0]; const toolName = tool_call.function.name; const toolArgs = JSON.parse(tool_call.function.arguments) as { [x: string]: unknown } | undefined; const result = await client.callTool({ name: toolName, arguments: toolArgs, }); console.log(`[大模型调用工具 ${toolName} 参数为 ${JSON.stringify(toolArgs)}]`) //将获取到的结果添加到messages中,准备发送给大模型 messages.push({ role: "user", content: result.content as string, }); //将获取到的结果添加到messages中,再发送一次请求,可以再在这个请求中带上tools,实现多轮调用,但是也需要更好的流程处理逻辑 const response = await openai.chat.completions.create({ model: "deepseek-chat", max_tokens: 1000, messages, }); console.log(response.choices[0].message.content); } 至此已经完成了Deepseek通过MCP获取数据库信息,使用以下指令查看调用情况。//编译ts npm run build //运行index.js node ./build/index.js 运行结果下一步至此,一个最简单的MCP流程已经完成,有非常多的地方可以完善。这个流程提供了最简单的sql查询功能。server描述做的也比较粗糙,并不是每一次大模型都能成功调用tool目前的请求方式是非流式的,可以改进为流式目前只支持一轮调用工具,无法实现多轮调用,允许MCP多轮调用能够更好发挥MCP的能力。目前一个客户端好像只支持连接一个服务端,在有多个服务的情况下,如何高效的管理客户端也是一个问题。 ## Publication Information - [LinSoap](https://paragraph.com/@linsoap/): Publication homepage - [All Posts](https://paragraph.com/@linsoap/): More posts from this publication - [RSS Feed](https://api.paragraph.com/blogs/rss/@linsoap): Subscribe to updates - [Twitter](https://twitter.com/LinSoap1024): Follow on Twitter