# TypeScript跑通第一个MCP，使用Deepseek查询sqlite数据库

By [LinSoap](https://paragraph.com/@linsoap) · 2025-04-03

---

最近最火的AI技术应该就是MCP了，还不了解MCP概念的建议查看[Model Context Protocol文档](https://modelcontextprotocol.io/introduction)，其中详细介绍了MCP的概念和规范。本文使用官方的TypeScript SDK，请求到DeepSeek，从服务端到客户端简单实现一个Sqlite MCP，顺利开始MCP开发。

初始化项目
=====

本项目所有资源都在这个git仓库[TypeScript-MCP-Sqlite-Quickstart](https://github.com/LinSoap/TypeScript-MCP-Sqlite-Quickstart?tab=readme-ov-file)，包括了案例使用到的sqlite文件，你可以直接拉取，或者按照步骤初始化，自己创建一个数据库。

创建项目

    创建文件夹
    mkdir TypeScript-MCP-Sqlite-Quickstart
    cd TypeScript-MCP-Sqlite-Quickstart
    
    初始化npm
    npm init -y
    
    安装相关依赖
    npm install @modelcontextprotocol/sdk zod sqlite3 express @types/express openai
    npm install -D @types/node typescript
    
    touch index.ts
    touch server/sqlite_stdio.ts
    touch server/sqlite_sse.ts
    

配置package.json和tsconfig.json **package.json**

    {
      "name": "typescript-mcp-sqlite-quickstart",
      "version": "1.0.0",
      "description": "",
      "main": "index.js",
      "scripts": {
        "build": "tsc && chmod 755 build/index.js"
      },
      "keywords": [],
      "author": "",
      "license": "ISC",
      "type": "module",
      "dependencies": {
        "@modelcontextprotocol/sdk": "^1.8.0",
        "@types/express": "^5.0.1",
        "express": "^5.1.0",
        "openai": "^4.91.1",
        "sqlite3": "^5.1.7",
        "zod": "^3.24.2"
      },
      "devDependencies": {
        "@types/node": "^22.13.17",
        "typescript": "^5.8.2"
      }
    }
    

**tsconfig.json**

    {
      "compilerOptions": {
        "target": "ES2022",
        "module": "Node16",
        "moduleResolution": "Node16",
        "outDir": "./build",
        "rootDir": "./",
        "strict": true,
        "esModuleInterop": true,
        "skipLibCheck": true,
        "forceConsistentCasingInFileNames": true
      },
      "include": ["index.ts","server/**/*"],
      "exclude": ["node_modules"]
    }
    

编写MCP Server
============

MCP Server有两种启动方式，一种是Stdio，一种是SSE，在本节都会进行介绍。首先介绍Stdio方式。

Stdio Transport
---------------

在./server/sqlite\_stdio.ts文件中添加

    import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
    import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
    import sqlite3 from "sqlite3";
    import { promisify } from "util";
    import { z } from "zod";
    
    //创建一个MCPServer实例
    export const sqliteServer = new McpServer({
      name: "SQLite Explorer",
      version: "1.0.0"
    });
    
    //初始化Sqlite
    const getDb = () => {
      const db = new sqlite3.Database("database.db");
      return {
        all: promisify<string, any[]>(db.all.bind(db)),
        close: promisify(db.close.bind(db))
      };
    };
    
    //定义一个server
    //第一个参数是tools名
    //第二个参数是描述，向大模型介绍该工具的用途
    //第三个参数是输入参数
    //第四个参数是调用的方法
    sqliteServer.tool(
      "query",
      "这是用于执行 SQLite 查询的工具,你可以使用它来执行任何有效的 SQL 查询,例如SELECT sql FROM sqlite_master WHERE type='table'，或者SELECT * FROM table_name",
      { sql: z.string() },
      async ({ sql }) => {
        const db = getDb();
        try {
          const results = await db.all(sql);
          return {
            content: [{
              type: "text",
              text: JSON.stringify(results, null, 2)
            }]
          };
        } catch (err: unknown) {
          const error = err as Error;
          return {
            content: [{
              type: "text",
              text: `Error: ${error.message}`
            }],
            isError: true
          };
        } finally {
          await db.close();
        }
      }
    );
    
    //使用StdioTransport连接Server
    const transport = new StdioServerTransport();
    await sqliteServer.connect(transport);
    

使用build编译ts文件，[inspector](https://github.com/modelcontextprotocol/inspector)是官方提供的一个MCP server检查工具，使用inspector检查server服务是否正常。

    //编译ts
    npm run build
    
    
    //运行检查工具
    npx @modelcontextprotocol/inspector
    

![Stdio运行结果](https://storage.googleapis.com/papyrus_images/87d3a6b8b7510455072ba380e654d6abe5123e1b33ba76eada4024bc04e4568d.webp)

Stdio运行结果

SSE Transport
-------------

在./server/sqlite\_sse.ts中添加

    import express, { Request, Response } from "express";
    import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
    import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
    import sqlite3 from "sqlite3";
    import { promisify } from "util";
    import { z } from "zod";
    
    //初始化一个MCPServer实例
    const sqliteServer = new McpServer({
      name: "SQLite Explorer",
      version: "1.0.0"
    });
    
    //初始化Sqlite
    const getDb = () => {
      const db = new sqlite3.Database("database.db");
      return {
        all: promisify<string, any[]>(db.all.bind(db)),
        close: promisify(db.close.bind(db))
      };
    };
    
    const app = express();
    
    //定义一个server，该部分与stdio相同
    sqliteServer.tool(
      "query",
      "这是用于执行 SQLite 查询的工具,你可以使用它来执行任何有效的 SQL 查询,例如SELECT sql FROM sqlite_master WHERE type='table'，或者SELECT * FROM table_name",
      { sql: z.string() },
      async ({ sql }) => {
        const db = getDb();
        try {
          const results = await db.all(sql);
          return {
            content: [{
              type: "text",
              text: JSON.stringify(results, null, 2)
            }]
          };
        } catch (err: unknown) {
          const error = err as Error;
          return {
            content: [{
              type: "text",
              text: `Error: ${error.message}`
            }],
            isError: true
          };
        } finally {
          await db.close();
        }
      }
    );
    
    //使用SSETransport连接Server
    const transports: {[sessionId: string]: SSEServerTransport} = {};
    
    //用于处理SSE连接的路由
    app.get("/sse", async (_: Request, res: Response) => {
      const transport = new SSEServerTransport('/messages', res);
      transports[transport.sessionId] = transport;
      res.on("close", () => {
        delete transports[transport.sessionId];
      });
      await sqliteServer.connect(transport);
    });
    
    //用于处理多个连接的路由
    app.post("/messages", async (req: Request, res: Response) => {
      const sessionId = req.query.sessionId as string;
      const transport = transports[sessionId];
      if (transport) {
        await transport.handlePostMessage(req, res);
      } else {
        res.status(400).send('No transport found for sessionId');
      }
    });
    
    app.listen(3001);
    

完成server编写后，同样要进行编译，还需要运行express服务，然后使用Inspector连接测试功能是否正常。

    //编译ts
    npm run build
    
    //运行express
    node ./build/server/sqlite_sse.js
    

打开Inspector，选择transport type为sse，设置url为[http://localhost:3001/sse](http://localhost:3001/sse)，点击连接，测试工具。

![SSE测试结果](https://storage.googleapis.com/papyrus_images/473bf49b01960363c7dcf855c3ad74de7e5589535e070f4549445a1a16dacbbe.webp)

SSE测试结果

MCP客户端编写
========

MCP客户端同样支持两种transport，一种是stdio，一种是sse，与服务端相对应。 在index.ts中添加

    import { Client } from "@modelcontextprotocol/sdk/client/index.js";
    import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
    import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
    
    //使用StdioTransport连接Server
    //这里填写的参数与Inspector测试中填写的参数一致
    const stdioTransport = new StdioClientTransport({
      command: "node",
      args: ["./build/server/sqlite_stdio.js"]
    });
    
    //使用SSETransport连接Server
    //这里填写的参数与Inspector测试中填写的参数一致
    //const sseTransport = new SSEClientTransport(new URL("http://localhost:3001/sse"));
    
    const client = new Client(
      {
        name: "example-client",
        version: "1.0.0"
      },
      {
        capabilities: {
          prompts: {},
          resources: {},
          tools: {}
        }
      }
    );
    
    //连接StdioTransport和SSETransport,不过貌似只能同时连接一个，
    //按需选择连接方式
    await client.connect(stdioTransport);
    // await client.connect(sseTransport);
    
    //获取工具列表
    const tools = await client.listTools();
    console.log("Tools:", tools);
    
    //获取资源列表
    // const resources = await client.listResources();
    
    //获取提示列表
    // const prompts = await client.listPrompts();
    

编写完MCP客户端后，编译ts，运行MCP客户端，如果列出可用工具列表，则说明客户端运行正常。

    //编译ts
    npm run build
    
    //运行客户端
    node build/index.js  
    

![MCP 客户端查询Tools结果](https://storage.googleapis.com/papyrus_images/58178244f866a9f2c05cdbd1b11ccfbf88588eadec3bd4b16c8f903cdfd11257.webp)

MCP 客户端查询Tools结果

向Deepseek发送请求调用MCP
==================

deepseek可以使用openai sdk发送请求，在发送请求的时候在参数中带上tools即可。 由于openai的Tools参数定义和anthropic的tools参数定义不同，在获取到tools后按需进行转换。 **转换函数**

    const toolsResult = await client.listTools();
    
    //anthropic格式的tools
    const anthropicTools = toolsResult.tools.map((tool) => {
            return {
              name: tool.name,
              description: tool.description,
              input_schema: tool.inputSchema,
            };
    });
    
    //openai格式的tools
    const openaiTools = toolsResult.tools.map((tool) => {
            return {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "input_schema": tool.inputSchema,
                }
            }
    })
    

至此，已经成功获取tools信息，只需要在请求deepseek的时候带上tools参数，然后监听response中的finish\_reson,如果为tools\_calls，则获取response中的toll\_calls中的name和args。然后调用MCP客户端的callTool方法，将获取到的结果重新发送给Deepseek，Deepseek就成功获取到了数据库中的信息。

    const openai = new OpenAI({
      apiKey: 'apikey',  // 使用你的apikey，建议从环境变量读取
      baseURL: 'https://api.deepseek.com/v1',
    });
    
    //替换成你的问题
    const messages: ChatCompletionMessageParam[] = [{ role: "user", content: "我有一个sqlite数据库，数据库中有expenses表和incomes表，告诉我我的收入和支出分别是多少？" }];
    
    // 发送第一次请求，带上tools参数
    const response = await openai.chat.completions.create({
            model: "deepseek-chat",
            messages: messages,
            max_tokens: 1000,
            tools: openaiTools,
      }
    );
    
    const content = response.choices[0];
    //监听finish_reason，如果是tool_calls，说明大模型调用了工具
    console.log("finish_resaon",content.finish_reason);
    if (content.finish_reason === "tool_calls" && content.message.tool_calls && content.message.tool_calls.length > 0) {
        //获取大模型返回工具调用的参数    
          const tool_call = content.message.tool_calls[0];
          const toolName = tool_call.function.name;
          const toolArgs = JSON.parse(tool_call.function.arguments) as { [x: string]: unknown } | undefined;
    
    
          const result = await client.callTool({
            name: toolName,
            arguments: toolArgs,
          });
    
          console.log(`[大模型调用工具 ${toolName}  参数为 ${JSON.stringify(toolArgs)}]`)
    
          //将获取到的结果添加到messages中，准备发送给大模型 
          messages.push({
            role: "user",
            content: result.content as string,
          });
    
          //将获取到的结果添加到messages中，再发送一次请求，可以再在这个请求中带上tools，实现多轮调用，但是也需要更好的流程处理逻辑
          const response = await openai.chat.completions.create({
            model: "deepseek-chat",
            max_tokens: 1000,
            messages,
          });
    
        console.log(response.choices[0].message.content);
    }
    

至此已经完成了Deepseek通过MCP获取数据库信息，使用以下指令查看调用情况。

    //编译ts
    npm run build
    
    //运行index.js
    node  ./build/index.js
    

![运行结果](https://storage.googleapis.com/papyrus_images/9f83909c6a38b41c303491fd1c679daef16c0e6680db5ef981cba70911a64dde.webp)

运行结果

下一步
===

至此，一个最简单的MCP流程已经完成，有非常多的地方可以完善。

*   这个流程提供了最简单的sql查询功能。server描述做的也比较粗糙，并不是每一次大模型都能成功调用tool
    
*   目前的请求方式是非流式的，可以改进为流式
    
*   目前只支持一轮调用工具，无法实现多轮调用，允许MCP多轮调用能够更好发挥MCP的能力。
    
*   目前一个客户端好像只支持连接一个服务端，在有多个服务的情况下，如何高效的管理客户端也是一个问题。

---

*Originally published on [LinSoap](https://paragraph.com/@linsoap/typescript-mcp-deepseek-sqlite)*
