Skip to content

pydantic_ai.mcp

MCPError

Bases: RuntimeError

Raised when an MCP server returns an error response.

This exception wraps error responses from MCP servers, following the ErrorData schema from the MCP specification.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class MCPError(RuntimeError):
    """Raised when an MCP server returns an error response.

    This exception wraps error responses from MCP servers, following the ErrorData schema
    from the MCP specification.
    """

    message: str
    """The error message."""

    code: int
    """The error code returned by the server."""

    data: dict[str, Any] | None
    """Additional information about the error, if provided by the server."""

    def __init__(self, message: str, code: int, data: dict[str, Any] | None = None):
        self.message = message
        self.code = code
        self.data = data
        super().__init__(message)

    @classmethod
    def from_mcp_sdk(cls, error: mcp_exceptions.McpError) -> MCPError:
        """Create an MCPError from an MCP SDK McpError.

        Args:
            error: An McpError from the MCP SDK.
        """
        # Extract error data from the McpError.error attribute
        error_data = error.error
        return cls(message=error_data.message, code=error_data.code, data=error_data.data)

    def __str__(self) -> str:
        if self.data:
            return f'{self.message} (code: {self.code}, data: {self.data})'
        return f'{self.message} (code: {self.code})'

message instance-attribute

message: str = message

The error message.

code instance-attribute

code: int = code

The error code returned by the server.

data instance-attribute

data: dict[str, Any] | None = data

Additional information about the error, if provided by the server.

from_mcp_sdk classmethod

from_mcp_sdk(error: McpError) -> MCPError

Create an MCPError from an MCP SDK McpError.

Parameters:

Name Type Description Default
error McpError

An McpError from the MCP SDK.

required
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
156
157
158
159
160
161
162
163
164
165
@classmethod
def from_mcp_sdk(cls, error: mcp_exceptions.McpError) -> MCPError:
    """Create an MCPError from an MCP SDK McpError.

    Args:
        error: An McpError from the MCP SDK.
    """
    # Extract error data from the McpError.error attribute
    error_data = error.error
    return cls(message=error_data.message, code=error_data.code, data=error_data.data)

ResourceAnnotations dataclass

Additional properties describing MCP entities.

See the resource annotations in the MCP specification.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@dataclass(repr=False, kw_only=True)
class ResourceAnnotations:
    """Additional properties describing MCP entities.

    See the [resource annotations in the MCP specification](https://modelcontextprotocol.io/specification/2025-06-18/server/resources#annotations).
    """

    audience: list[mcp_types.Role] | None = None
    """Intended audience for this entity."""

    priority: Annotated[float, Field(ge=0.0, le=1.0)] | None = None
    """Priority level for this entity, ranging from 0.0 to 1.0."""

    __repr__ = _utils.dataclasses_no_defaults_repr

    @classmethod
    def from_mcp_sdk(cls, mcp_annotations: mcp_types.Annotations) -> ResourceAnnotations:
        """Convert from MCP SDK Annotations to ResourceAnnotations.

        Args:
            mcp_annotations: The MCP SDK annotations object.
        """
        return cls(audience=mcp_annotations.audience, priority=mcp_annotations.priority)

audience class-attribute instance-attribute

audience: list[Role] | None = None

Intended audience for this entity.

priority class-attribute instance-attribute

priority: Annotated[float, Field(ge=0.0, le=1.0)] | None = (
    None
)

Priority level for this entity, ranging from 0.0 to 1.0.

from_mcp_sdk classmethod

from_mcp_sdk(
    mcp_annotations: Annotations,
) -> ResourceAnnotations

Convert from MCP SDK Annotations to ResourceAnnotations.

Parameters:

Name Type Description Default
mcp_annotations Annotations

The MCP SDK annotations object.

required
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
188
189
190
191
192
193
194
195
@classmethod
def from_mcp_sdk(cls, mcp_annotations: mcp_types.Annotations) -> ResourceAnnotations:
    """Convert from MCP SDK Annotations to ResourceAnnotations.

    Args:
        mcp_annotations: The MCP SDK annotations object.
    """
    return cls(audience=mcp_annotations.audience, priority=mcp_annotations.priority)

BaseResource dataclass

Bases: ABC

Base class for MCP resources.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@dataclass(repr=False, kw_only=True)
class BaseResource(ABC):
    """Base class for MCP resources."""

    name: str
    """The programmatic name of the resource."""

    title: str | None = None
    """Human-readable title for UI contexts."""

    description: str | None = None
    """A description of what this resource represents."""

    mime_type: str | None = None
    """The MIME type of the resource, if known."""

    annotations: ResourceAnnotations | None = None
    """Optional annotations for the resource."""

    metadata: dict[str, Any] | None = None
    """Optional metadata for the resource."""

    __repr__ = _utils.dataclasses_no_defaults_repr

name instance-attribute

name: str

The programmatic name of the resource.

title class-attribute instance-attribute

title: str | None = None

Human-readable title for UI contexts.

description class-attribute instance-attribute

description: str | None = None

A description of what this resource represents.

mime_type class-attribute instance-attribute

mime_type: str | None = None

The MIME type of the resource, if known.

annotations class-attribute instance-attribute

annotations: ResourceAnnotations | None = None

Optional annotations for the resource.

metadata class-attribute instance-attribute

metadata: dict[str, Any] | None = None

Optional metadata for the resource.

Resource dataclass

Bases: BaseResource

A resource that can be read from an MCP server.

See the resources in the MCP specification.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
@dataclass(repr=False, kw_only=True)
class Resource(BaseResource):
    """A resource that can be read from an MCP server.

    See the [resources in the MCP specification](https://modelcontextprotocol.io/specification/2025-06-18/server/resources).
    """

    uri: str
    """The URI of the resource."""

    size: int | None = None
    """The size of the raw resource content in bytes (before base64 encoding), if known."""

    @classmethod
    def from_mcp_sdk(cls, mcp_resource: mcp_types.Resource) -> Resource:
        """Convert from MCP SDK Resource to PydanticAI Resource.

        Args:
            mcp_resource: The MCP SDK Resource object.
        """
        return cls(
            uri=str(mcp_resource.uri),
            name=mcp_resource.name,
            title=mcp_resource.title,
            description=mcp_resource.description,
            mime_type=mcp_resource.mimeType,
            size=mcp_resource.size,
            annotations=ResourceAnnotations.from_mcp_sdk(mcp_resource.annotations)
            if mcp_resource.annotations
            else None,
            metadata=mcp_resource.meta,
        )

uri instance-attribute

uri: str

The URI of the resource.

size class-attribute instance-attribute

size: int | None = None

The size of the raw resource content in bytes (before base64 encoding), if known.

from_mcp_sdk classmethod

from_mcp_sdk(mcp_resource: Resource) -> Resource

Convert from MCP SDK Resource to PydanticAI Resource.

Parameters:

Name Type Description Default
mcp_resource Resource

The MCP SDK Resource object.

required
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
@classmethod
def from_mcp_sdk(cls, mcp_resource: mcp_types.Resource) -> Resource:
    """Convert from MCP SDK Resource to PydanticAI Resource.

    Args:
        mcp_resource: The MCP SDK Resource object.
    """
    return cls(
        uri=str(mcp_resource.uri),
        name=mcp_resource.name,
        title=mcp_resource.title,
        description=mcp_resource.description,
        mime_type=mcp_resource.mimeType,
        size=mcp_resource.size,
        annotations=ResourceAnnotations.from_mcp_sdk(mcp_resource.annotations)
        if mcp_resource.annotations
        else None,
        metadata=mcp_resource.meta,
    )

ResourceTemplate dataclass

Bases: BaseResource

A template for parameterized resources on an MCP server.

See the resource templates in the MCP specification.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@dataclass(repr=False, kw_only=True)
class ResourceTemplate(BaseResource):
    """A template for parameterized resources on an MCP server.

    See the [resource templates in the MCP specification](https://modelcontextprotocol.io/specification/2025-06-18/server/resources#resource-templates).
    """

    uri_template: str
    """URI template (RFC 6570) for constructing resource URIs."""

    @classmethod
    def from_mcp_sdk(cls, mcp_template: mcp_types.ResourceTemplate) -> ResourceTemplate:
        """Convert from MCP SDK ResourceTemplate to PydanticAI ResourceTemplate.

        Args:
            mcp_template: The MCP SDK ResourceTemplate object.
        """
        return cls(
            uri_template=mcp_template.uriTemplate,
            name=mcp_template.name,
            title=mcp_template.title,
            description=mcp_template.description,
            mime_type=mcp_template.mimeType,
            annotations=ResourceAnnotations.from_mcp_sdk(mcp_template.annotations)
            if mcp_template.annotations
            else None,
            metadata=mcp_template.meta,
        )

uri_template instance-attribute

uri_template: str

URI template (RFC 6570) for constructing resource URIs.

from_mcp_sdk classmethod

from_mcp_sdk(
    mcp_template: ResourceTemplate,
) -> ResourceTemplate

Convert from MCP SDK ResourceTemplate to PydanticAI ResourceTemplate.

Parameters:

Name Type Description Default
mcp_template ResourceTemplate

The MCP SDK ResourceTemplate object.

required
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@classmethod
def from_mcp_sdk(cls, mcp_template: mcp_types.ResourceTemplate) -> ResourceTemplate:
    """Convert from MCP SDK ResourceTemplate to PydanticAI ResourceTemplate.

    Args:
        mcp_template: The MCP SDK ResourceTemplate object.
    """
    return cls(
        uri_template=mcp_template.uriTemplate,
        name=mcp_template.name,
        title=mcp_template.title,
        description=mcp_template.description,
        mime_type=mcp_template.mimeType,
        annotations=ResourceAnnotations.from_mcp_sdk(mcp_template.annotations)
        if mcp_template.annotations
        else None,
        metadata=mcp_template.meta,
    )

ServerCapabilities dataclass

Capabilities that an MCP server supports.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@dataclass(repr=False, kw_only=True)
class ServerCapabilities:
    """Capabilities that an MCP server supports."""

    experimental: list[str] | None = None
    """Experimental, non-standard capabilities that the server supports."""

    logging: bool = False
    """Whether the server supports sending log messages to the client."""

    prompts: bool = False
    """Whether the server offers any prompt templates."""

    prompts_list_changed: bool = False
    """Whether the server will emit notifications when the list of prompts changes."""

    resources: bool = False
    """Whether the server offers any resources to read."""

    resources_list_changed: bool = False
    """Whether the server will emit notifications when the list of resources changes."""

    tools: bool = False
    """Whether the server offers any tools to call."""

    tools_list_changed: bool = False
    """Whether the server will emit notifications when the list of tools changes."""

    completions: bool = False
    """Whether the server offers autocompletion suggestions for prompts and resources."""

    __repr__ = _utils.dataclasses_no_defaults_repr

    @classmethod
    def from_mcp_sdk(cls, mcp_capabilities: mcp_types.ServerCapabilities) -> ServerCapabilities:
        """Convert from MCP SDK ServerCapabilities to PydanticAI ServerCapabilities.

        Args:
            mcp_capabilities: The MCP SDK ServerCapabilities object.
        """
        prompts_cap = mcp_capabilities.prompts
        resources_cap = mcp_capabilities.resources
        tools_cap = mcp_capabilities.tools
        return cls(
            experimental=list(mcp_capabilities.experimental.keys()) if mcp_capabilities.experimental else None,
            logging=mcp_capabilities.logging is not None,
            prompts=prompts_cap is not None,
            prompts_list_changed=bool(prompts_cap.listChanged) if prompts_cap else False,
            resources=resources_cap is not None,
            resources_list_changed=bool(resources_cap.listChanged) if resources_cap else False,
            tools=tools_cap is not None,
            tools_list_changed=bool(tools_cap.listChanged) if tools_cap else False,
            completions=mcp_capabilities.completions is not None,
        )

experimental class-attribute instance-attribute

experimental: list[str] | None = None

Experimental, non-standard capabilities that the server supports.

logging class-attribute instance-attribute

logging: bool = False

Whether the server supports sending log messages to the client.

prompts class-attribute instance-attribute

prompts: bool = False

Whether the server offers any prompt templates.

prompts_list_changed class-attribute instance-attribute

prompts_list_changed: bool = False

Whether the server will emit notifications when the list of prompts changes.

resources class-attribute instance-attribute

resources: bool = False

Whether the server offers any resources to read.

resources_list_changed class-attribute instance-attribute

resources_list_changed: bool = False

Whether the server will emit notifications when the list of resources changes.

tools class-attribute instance-attribute

tools: bool = False

Whether the server offers any tools to call.

tools_list_changed class-attribute instance-attribute

tools_list_changed: bool = False

Whether the server will emit notifications when the list of tools changes.

completions class-attribute instance-attribute

completions: bool = False

Whether the server offers autocompletion suggestions for prompts and resources.

from_mcp_sdk classmethod

from_mcp_sdk(
    mcp_capabilities: ServerCapabilities,
) -> ServerCapabilities

Convert from MCP SDK ServerCapabilities to PydanticAI ServerCapabilities.

Parameters:

Name Type Description Default
mcp_capabilities ServerCapabilities

The MCP SDK ServerCapabilities object.

required
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@classmethod
def from_mcp_sdk(cls, mcp_capabilities: mcp_types.ServerCapabilities) -> ServerCapabilities:
    """Convert from MCP SDK ServerCapabilities to PydanticAI ServerCapabilities.

    Args:
        mcp_capabilities: The MCP SDK ServerCapabilities object.
    """
    prompts_cap = mcp_capabilities.prompts
    resources_cap = mcp_capabilities.resources
    tools_cap = mcp_capabilities.tools
    return cls(
        experimental=list(mcp_capabilities.experimental.keys()) if mcp_capabilities.experimental else None,
        logging=mcp_capabilities.logging is not None,
        prompts=prompts_cap is not None,
        prompts_list_changed=bool(prompts_cap.listChanged) if prompts_cap else False,
        resources=resources_cap is not None,
        resources_list_changed=bool(resources_cap.listChanged) if resources_cap else False,
        tools=tools_cap is not None,
        tools_list_changed=bool(tools_cap.listChanged) if tools_cap else False,
        completions=mcp_capabilities.completions is not None,
    )

MCPServer

Bases: AbstractToolset[Any], ABC

Base class for attaching agents to MCP servers.

See https://modelcontextprotocol.io for more information.

Deprecated

This class hierarchy (MCPServer, MCPServerStdio, MCPServerSSE, MCPServerStreamableHTTP, MCPServerHTTP) is deprecated in favor of MCPToolset, which is built on the more capable FastMCP client and supports the full MCP protocol. The concrete subclasses will be removed in v2.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
class MCPServer(AbstractToolset[Any], ABC):
    """Base class for attaching agents to MCP servers.

    See <https://modelcontextprotocol.io> for more information.

    !!! warning "Deprecated"
        This class hierarchy (`MCPServer`, `MCPServerStdio`, `MCPServerSSE`,
        `MCPServerStreamableHTTP`, `MCPServerHTTP`) is deprecated in favor of
        [`MCPToolset`][pydantic_ai.mcp.MCPToolset], which is built on the more capable FastMCP
        client and supports the full MCP protocol. The concrete subclasses will be removed in v2.
    """

    tool_prefix: str | None
    """A prefix to add to all tools that are registered with the server.

    If not empty, will include a trailing underscore(`_`).

    e.g. if `tool_prefix='foo'`, then a tool named `bar` will be registered as `foo_bar`
    """

    log_level: mcp_types.LoggingLevel | None
    """The log level to set when connecting to the server, if any.

    See <https://modelcontextprotocol.io/specification/2025-03-26/server/utilities/logging#logging> for more details.

    If `None`, no log level will be set.
    """

    log_handler: LoggingFnT | None
    """A handler for logging messages from the server."""

    timeout: float
    """The timeout in seconds to wait for the client to initialize."""

    read_timeout: float
    """Maximum time in seconds to wait for new messages before timing out.

    This timeout applies to the long-lived connection after it's established.
    If no new messages are received within this time, the connection will be considered stale
    and may be closed. Defaults to 5 minutes (300 seconds).
    """

    process_tool_call: ProcessToolCallback | None
    """Hook to customize tool calling and optionally pass extra metadata."""

    allow_sampling: bool
    """Whether to allow MCP sampling through this client."""

    sampling_model: models.Model | None
    """The model to use for sampling."""

    max_retries: int
    """The maximum number of times to retry a tool call."""

    elicitation_callback: ElicitationFnT | None = None
    """Callback function to handle elicitation requests from the server."""

    cache_tools: bool
    """Whether to cache the list of tools.

    When enabled (default), tools are fetched once and cached until either:
    - The server sends a `notifications/tools/list_changed` notification
    - [`MCPServer.__aexit__`][pydantic_ai.mcp.MCPServer.__aexit__] is called (when the last context exits)

    Set to `False` for servers that change tools dynamically without sending notifications.

    Note: When using durable execution (Temporal, DBOS), tool definitions are additionally cached
    at the wrapper level across activities/steps, to avoid redundant MCP connections. This
    wrapper-level cache is not invalidated by `tools/list_changed` notifications.
    Set to `False` to disable all caching if tools may change during a workflow.
    """

    cache_resources: bool
    """Whether to cache the list of resources.

    When enabled (default), resources are fetched once and cached until either:
    - The server sends a `notifications/resources/list_changed` notification
    - [`MCPServer.__aexit__`][pydantic_ai.mcp.MCPServer.__aexit__] is called (when the last context exits)

    Set to `False` for servers that change resources dynamically without sending notifications.
    """

    include_instructions: bool
    """Whether to include the server's instructions in the agent's instructions.

    Defaults to `False` for backward compatibility.
    """

    include_return_schema: bool | None
    """Whether to include return schemas in tool definitions sent to the model.

    When `None` (default), defaults to `False` unless the
    [`IncludeToolReturnSchemas`][pydantic_ai.capabilities.IncludeToolReturnSchemas] capability is used.
    """

    _id: str | None

    _session_state: _MCPSessionState = field(compare=False)

    _server_info: mcp_types.Implementation
    _server_capabilities: ServerCapabilities
    _instructions: str | None

    _cached_tools: list[mcp_types.Tool] | None
    _cached_resources: list[Resource] | None

    @functools.cached_property
    def _enter_lock(self) -> anyio.Lock:
        return anyio.Lock()

    # TODO (v2): enforce the arguments to be passed as keyword arguments only
    def __init__(
        self,
        tool_prefix: str | None = None,
        log_level: mcp_types.LoggingLevel | None = None,
        log_handler: LoggingFnT | None = None,
        timeout: float = 5,
        read_timeout: float = 5 * 60,
        process_tool_call: ProcessToolCallback | None = None,
        allow_sampling: bool = True,
        sampling_model: models.Model | None = None,
        max_retries: int = 1,
        elicitation_callback: ElicitationFnT | None = None,
        cache_tools: bool = True,
        cache_resources: bool = True,
        *,
        include_instructions: bool = False,
        include_return_schema: bool | None = None,
        id: str | None = None,
        client_info: mcp_types.Implementation | None = None,
    ):
        self.tool_prefix = tool_prefix
        self.log_level = log_level
        self.log_handler = log_handler
        self.timeout = timeout
        self.read_timeout = read_timeout
        self.process_tool_call = process_tool_call
        self.allow_sampling = allow_sampling
        self.sampling_model = sampling_model
        self.max_retries = max_retries
        self.elicitation_callback = elicitation_callback
        self.cache_tools = cache_tools
        self.cache_resources = cache_resources
        self.include_instructions = include_instructions
        self.include_return_schema = include_return_schema
        self.client_info = client_info

        self._id = id or tool_prefix

        self.__post_init__()

    def __post_init__(self):
        self._session_state = _MCPSessionState()
        self._cached_tools = None
        self._cached_resources = None

    @abstractmethod
    @asynccontextmanager
    async def client_streams(
        self,
    ) -> AsyncIterator[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
        ]
    ]:
        """Create the streams for the MCP server."""
        raise NotImplementedError('MCP Server subclasses must implement this method.')
        yield

    @property
    def id(self) -> str | None:
        return self._id

    @id.setter
    def id(self, value: str | None):
        self._id = value

    @property
    def label(self) -> str:
        if self.id:
            return super().label  # pragma: no cover
        else:
            return repr(self)

    @property
    def tool_name_conflict_hint(self) -> str:
        return 'Set the `tool_prefix` attribute to avoid name conflicts.'

    @property
    def server_info(self) -> mcp_types.Implementation:
        """Access the information send by the MCP server during initialization."""
        if getattr(self, '_server_info', None) is None:
            raise AttributeError(
                f'The `{self.__class__.__name__}.server_info` is only instantiated after initialization.'
            )
        return self._server_info

    @property
    def capabilities(self) -> ServerCapabilities:
        """Access the capabilities advertised by the MCP server during initialization."""
        if getattr(self, '_server_capabilities', None) is None:
            raise AttributeError(
                f'The `{self.__class__.__name__}.capabilities` is only instantiated after initialization.'
            )
        return self._server_capabilities

    @property
    def instructions(self) -> str | None:
        """Access the instructions sent by the MCP server during initialization."""
        if not hasattr(self, '_instructions'):
            raise AttributeError(
                f'The `{self.__class__.__name__}.instructions` is only available after initialization.'
            )
        return self._instructions

    async def get_instructions(self, ctx: RunContext[Any]) -> messages.InstructionPart | None:
        """Return the MCP server's instructions for how to use its tools.

        If [`include_instructions`][pydantic_ai.mcp.MCPServer.include_instructions] is `True`, returns
        the [`instructions`][pydantic_ai.mcp.MCPServer.instructions] sent by the MCP server during
        initialization. Otherwise, returns `None`.

        Instructions from external servers are marked as dynamic since they may change between connections.

        Args:
            ctx: The run context for this agent run.

        Returns:
            An `InstructionPart` with the server's instructions if `include_instructions` is enabled, otherwise `None`.
        """
        if not self.include_instructions:
            return None
        try:
            instr = self.instructions
        except AttributeError:
            # Server not yet initialized — return None rather than propagating.
            # Durable execution wrappers detect this and fetch via activity/step.
            return None
        return messages.InstructionPart(content=instr, dynamic=True) if instr is not None else None

    async def list_tools(self) -> list[mcp_types.Tool]:
        """Retrieve tools that are currently active on the server.

        Tools are cached by default, with cache invalidation on:
        - `notifications/tools/list_changed` notifications from the server
        - `__aexit__` when the last context exits

        Set `cache_tools=False` for servers that change tools without sending notifications.
        """
        if self.cache_tools and self._cached_tools is not None:
            return self._cached_tools

        async with self:
            result = await self._get_client().list_tools()
            if self.cache_tools:
                self._cached_tools = result.tools
            return result.tools

    async def direct_call_tool(
        self,
        name: str,
        args: dict[str, Any],
        metadata: dict[str, Any] | None = None,
    ) -> ToolResult:
        """Call a tool on the server.

        Args:
            name: The name of the tool to call.
            args: The arguments to pass to the tool.
            metadata: Request-level metadata (optional)

        Returns:
            The result of the tool call.

        Raises:
            ModelRetry: If the tool call fails.
        """
        async with self:  # Ensure server is running
            try:
                result = await self._get_client().send_request(
                    mcp_types.ClientRequest(
                        mcp_types.CallToolRequest(
                            method='tools/call',
                            params=mcp_types.CallToolRequestParams(
                                name=name,
                                arguments=args,
                                _meta=mcp_types.RequestParams.Meta(**metadata) if metadata else None,
                            ),
                        )
                    ),
                    mcp_types.CallToolResult,
                )
            except mcp_exceptions.McpError as e:
                raise exceptions.ModelRetry(e.error.message)

        if result.isError:
            message: str | None = None
            if result.content:  # pragma: no branch
                text_parts = [part.text for part in result.content if isinstance(part, mcp_types.TextContent)]
                message = '\n'.join(text_parts)

            raise exceptions.ModelRetry(message or 'MCP tool call failed')

        # Prefer structured content if there are only text parts, which per the docs would contain the JSON-encoded structured content for backward compatibility.
        # See https://github.com/modelcontextprotocol/python-sdk#structured-output
        if (structured := result.structuredContent) and not any(
            not isinstance(part, mcp_types.TextContent) for part in result.content
        ):
            # The MCP SDK wraps primitives and generic types like list in a `result` key, but we want to use the raw value returned by the tool function.
            # See https://github.com/modelcontextprotocol/python-sdk#structured-output
            if isinstance(structured, dict) and len(structured) == 1 and 'result' in structured:
                return structured['result']
            return structured

        mapped = [await self._map_tool_result_part(part) for part in result.content]
        return mapped[0] if len(mapped) == 1 else mapped

    async def call_tool(
        self,
        name: str,
        tool_args: dict[str, Any],
        ctx: RunContext[Any],
        tool: ToolsetTool[Any],
    ) -> ToolResult:
        if self.tool_prefix:
            name = name.removeprefix(f'{self.tool_prefix}_')
            ctx = replace(ctx, tool_name=name)

        if self.process_tool_call is not None:
            return await self.process_tool_call(ctx, self.direct_call_tool, name, tool_args)
        else:
            return await self.direct_call_tool(name, tool_args)

    async def get_tools(self, ctx: RunContext[Any]) -> dict[str, ToolsetTool[Any]]:
        return {
            name: self.tool_for_tool_def(
                ToolDefinition(
                    name=name,
                    description=mcp_tool.description,
                    parameters_json_schema=mcp_tool.inputSchema,
                    metadata={
                        'meta': mcp_tool.meta,
                        'annotations': mcp_tool.annotations.model_dump() if mcp_tool.annotations else None,
                        'output_schema': mcp_tool.outputSchema or None,
                    },
                    return_schema=mcp_tool.outputSchema or None,
                    include_return_schema=self.include_return_schema,
                ),
            )
            for mcp_tool in await self.list_tools()
            if (name := f'{self.tool_prefix}_{mcp_tool.name}' if self.tool_prefix else mcp_tool.name)
        }

    def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[Any]:
        return ToolsetTool(
            toolset=self,
            tool_def=tool_def,
            max_retries=self.max_retries,
            args_validator=TOOL_SCHEMA_VALIDATOR,
        )

    async def list_resources(self) -> list[Resource]:
        """Retrieve resources that are currently present on the server.

        Resources are cached by default, with cache invalidation on:
        - `notifications/resources/list_changed` notifications from the server
        - `__aexit__` when the last context exits

        Set `cache_resources=False` for servers that change resources without sending notifications.

        Raises:
            MCPError: If the server returns an error.
        """
        if self.cache_resources and self._cached_resources is not None:
            return self._cached_resources

        async with self:
            if not self.capabilities.resources:
                return []
            try:
                result = await self._get_client().list_resources()
                resources = [Resource.from_mcp_sdk(r) for r in result.resources]
                if self.cache_resources:
                    self._cached_resources = resources
                return resources
            except mcp_exceptions.McpError as e:
                raise MCPError.from_mcp_sdk(e) from e

    async def list_resource_templates(self) -> list[ResourceTemplate]:
        """Retrieve resource templates that are currently present on the server.

        Raises:
            MCPError: If the server returns an error.
        """
        async with self:  # Ensure server is running
            if not self.capabilities.resources:
                return []
            try:
                result = await self._get_client().list_resource_templates()
            except mcp_exceptions.McpError as e:
                raise MCPError.from_mcp_sdk(e) from e
        return [ResourceTemplate.from_mcp_sdk(t) for t in result.resourceTemplates]

    @overload
    async def read_resource(self, uri: str) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ...

    @overload
    async def read_resource(
        self, uri: Resource
    ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ...

    async def read_resource(
        self, uri: str | Resource
    ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]:
        """Read the contents of a specific resource by URI.

        Args:
            uri: The URI of the resource to read, or a Resource object.

        Returns:
            The resource contents. If the resource has a single content item, returns that item directly.
            If the resource has multiple content items, returns a list of items.

        Raises:
            MCPError: If the server returns an error.
        """
        resource_uri = uri if isinstance(uri, str) else uri.uri
        async with self:  # Ensure server is running
            try:
                result = await self._get_client().read_resource(AnyUrl(resource_uri))
            except mcp_exceptions.McpError as e:
                raise MCPError.from_mcp_sdk(e) from e

        return (
            self._get_content(result.contents[0])
            if len(result.contents) == 1
            else [self._get_content(resource) for resource in result.contents]
        )

    def _get_client(self) -> ClientSession:
        client = self._session_state.client
        if client is None:
            raise RuntimeError(  # pragma: no cover
                f'{self.__class__.__name__} is not connected. Use `async with server:` to open a connection first.'
            )
        return client

    async def _session_runner(self) -> None:
        """Own the MCP session's lifecycle for this server.

        Entered AND exited inside this single dedicated asyncio.Task, so the underlying
        anyio cancel scopes (from stdio_client / streamable_http_client / etc.) are
        always exited in the same task they were entered in.
        """
        state = self._session_state
        # Capture local references so a recycled session (new __aenter__ replacing
        # state.ready_event/state.stop_event before this runner's `finally` runs)
        # cannot corrupt the next session's events.
        ready_event = state.ready_event
        stop_event = state.stop_event
        assert ready_event is not None
        assert stop_event is not None
        client: ClientSession | None = None
        try:
            async with AsyncExitStack() as stack:
                read_stream, write_stream = await stack.enter_async_context(self.client_streams())
                session = ClientSession(
                    read_stream=read_stream,
                    write_stream=write_stream,
                    sampling_callback=self._sampling_callback if self.allow_sampling else None,
                    elicitation_callback=self.elicitation_callback,
                    logging_callback=self.log_handler,
                    read_timeout_seconds=timedelta(seconds=self.read_timeout),
                    message_handler=self._handle_notification,
                    client_info=self.client_info,
                )
                client = await stack.enter_async_context(session)

                with anyio.fail_after(self.timeout):
                    result = await client.initialize()
                    self._server_info = result.serverInfo
                    self._server_capabilities = ServerCapabilities.from_mcp_sdk(result.capabilities)
                    self._instructions = result.instructions
                    if log_level := self.log_level:
                        await client.set_logging_level(log_level)

                state.client = client
                ready_event.set()
                await stop_event.wait()
        except BaseException as e:
            # Only record the error if we are still the active session — otherwise
            # __aenter__ has already moved on with a fresh session_task.
            if state.session_task is asyncio.current_task():
                state.connect_error = e
        finally:
            # Only clear state.client if it still references *our* client; a
            # recycled session may have already installed a new one.
            if state.client is client:
                state.client = None
            ready_event.set()

    async def __aenter__(self) -> Self:
        """Enter the MCP server context.

        The first call starts the connection (spawning a subprocess for stdio servers,
        opening an HTTP connection for HTTP servers). Subsequent calls — from any task
        — share the same connection via reference counting. The connection is torn
        down when the last `async with` scope exits.

        Because the session runs in a dedicated background task, entering and exiting
        from different tasks (e.g. `asyncio.gather` children, fasta2a workers, or
        graph node tasks) is safe: the underlying transport's cancel scopes never
        cross task boundaries.
        """
        async with self._enter_lock:
            state = self._session_state
            need_to_start = state.session_task is None or state.session_task.done()
            if need_to_start:
                state.stop_event = anyio.Event()
                state.ready_event = anyio.Event()
                state.connect_error = None
                state.client = None
                state.session_task = asyncio.create_task(self._session_runner())
                try:
                    await state.ready_event.wait()
                except BaseException:
                    # Cancelled while waiting for startup: tear down the session task
                    # without impacting anyone else (we hold the lock and just started it)
                    task = state.session_task
                    state.stop_event.set()
                    await state.force_close(task)
                    state.session_task = None
                    state.client = None
                    raise
                if state.connect_error is not None:
                    # Connection failed during startup; surface the error and reset state
                    state.session_task = None
                    err = state.connect_error
                    state.connect_error = None
                    raise err
            state.nesting_counter += 1
        return self

    async def __aexit__(self, *args: Any) -> bool | None:
        state = self._session_state
        session_task_to_await: asyncio.Task[None] | None = None
        async with self._enter_lock:
            if state.nesting_counter == 0:
                raise ValueError('MCPServer.__aexit__ called more times than __aenter__')
            state.nesting_counter -= 1
            if state.nesting_counter > 0:
                return None
            if state.session_task is None:
                return None
            assert state.stop_event is not None
            state.stop_event.set()
            session_task_to_await = state.session_task
            state.session_task = None
            self._cached_tools = None
            self._cached_resources = None
        # Await outside the lock: the session task's cancel scopes unwind inside the
        # task itself, so this await can safely happen from any caller. Bound the
        # wait so a transport whose `__aexit__` deadlocks (hung subprocess, server
        # that never closes the connection) cannot block our own shutdown forever;
        # `move_on_after` cancels this `await`, which propagates the cancel through
        # to `session_task_to_await` itself, so the runner gets torn down too.
        with anyio.move_on_after(_SHUTDOWN_GRACE_SECONDS):
            try:
                await session_task_to_await
            except BaseException:
                pass
        return None

    @property
    def is_running(self) -> bool:
        """Check if the MCP server is running."""
        return self._session_state.nesting_counter > 0

    async def _sampling_callback(
        self, context: RequestContext[ClientSession, Any], params: mcp_types.CreateMessageRequestParams
    ) -> mcp_types.CreateMessageResult | mcp_types.ErrorData:
        """MCP sampling callback."""
        if self.sampling_model is None:
            raise ValueError('Sampling model is not set')  # pragma: no cover

        pai_messages = _mcp.map_from_mcp_params(params)
        model_settings = ModelSettings(max_tokens=params.maxTokens)
        if (temperature := params.temperature) is not None:  # pragma: no branch
            model_settings['temperature'] = temperature
        if (stop_sequences := params.stopSequences) is not None:  # pragma: no branch
            model_settings['stop_sequences'] = stop_sequences

        model_response = await model_request(self.sampling_model, pai_messages, model_settings=model_settings)
        return mcp_types.CreateMessageResult(
            role='assistant',
            content=_mcp.map_from_model_response(model_response),
            model=self.sampling_model.model_name,
        )

    async def _handle_notification(
        self,
        message: RequestResponder[mcp_types.ServerRequest, mcp_types.ClientResult]
        | mcp_types.ServerNotification
        | Exception,
    ) -> None:
        """Handle notifications from the MCP server, invalidating caches as needed."""
        if isinstance(message, mcp_types.ServerNotification):  # pragma: no branch
            if isinstance(message.root, mcp_types.ToolListChangedNotification):
                self._cached_tools = None
            elif isinstance(message.root, mcp_types.ResourceListChangedNotification):
                self._cached_resources = None

    async def _map_tool_result_part(
        self, part: mcp_types.ContentBlock
    ) -> str | messages.BinaryContent | dict[str, Any] | list[Any]:
        # See https://github.com/jlowin/fastmcp/blob/main/docs/servers/tools.mdx#return-values

        if isinstance(part, mcp_types.TextContent):
            text = part.text
            if text.startswith(('[', '{')):
                try:
                    return pydantic_core.from_json(text)
                except ValueError:
                    pass
            return text
        elif isinstance(part, mcp_types.ImageContent):
            return messages.BinaryImage(data=base64.b64decode(part.data), media_type=part.mimeType)
        elif isinstance(part, mcp_types.AudioContent):
            # NOTE: The FastMCP server doesn't support audio content.
            # See <https://github.com/modelcontextprotocol/python-sdk/issues/952> for more details.
            return messages.BinaryContent(
                data=base64.b64decode(part.data), media_type=part.mimeType
            )  # pragma: no cover
        elif isinstance(part, mcp_types.EmbeddedResource):
            resource = part.resource
            return self._get_content(resource)
        elif isinstance(part, mcp_types.ResourceLink):
            return await self.read_resource(str(part.uri))
        else:
            assert_never(part)

    def _get_content(
        self, resource: mcp_types.TextResourceContents | mcp_types.BlobResourceContents
    ) -> str | messages.BinaryContent:
        if isinstance(resource, mcp_types.TextResourceContents):
            return resource.text
        elif isinstance(resource, mcp_types.BlobResourceContents):
            return messages.BinaryContent.narrow_type(
                messages.BinaryContent(
                    data=base64.b64decode(resource.blob), media_type=resource.mimeType or 'application/octet-stream'
                )
            )
        else:
            assert_never(resource)

    def __eq__(self, value: object, /) -> bool:
        return isinstance(value, MCPServer) and self.id == value.id and self.tool_prefix == value.tool_prefix

tool_prefix instance-attribute

tool_prefix: str | None = tool_prefix

A prefix to add to all tools that are registered with the server.

If not empty, will include a trailing underscore(_).

e.g. if tool_prefix='foo', then a tool named bar will be registered as foo_bar

log_level instance-attribute

log_level: LoggingLevel | None = log_level

The log level to set when connecting to the server, if any.

See https://modelcontextprotocol.io/specification/2025-03-26/server/utilities/logging#logging for more details.

If None, no log level will be set.

log_handler instance-attribute

log_handler: LoggingFnT | None = log_handler

A handler for logging messages from the server.

timeout instance-attribute

timeout: float = timeout

The timeout in seconds to wait for the client to initialize.

read_timeout instance-attribute

read_timeout: float = read_timeout

Maximum time in seconds to wait for new messages before timing out.

This timeout applies to the long-lived connection after it's established. If no new messages are received within this time, the connection will be considered stale and may be closed. Defaults to 5 minutes (300 seconds).

process_tool_call instance-attribute

process_tool_call: ProcessToolCallback | None = (
    process_tool_call
)

Hook to customize tool calling and optionally pass extra metadata.

allow_sampling instance-attribute

allow_sampling: bool = allow_sampling

Whether to allow MCP sampling through this client.

sampling_model instance-attribute

sampling_model: Model | None = sampling_model

The model to use for sampling.

max_retries instance-attribute

max_retries: int = max_retries

The maximum number of times to retry a tool call.

elicitation_callback class-attribute instance-attribute

elicitation_callback: ElicitationFnT | None = (
    elicitation_callback
)

Callback function to handle elicitation requests from the server.

cache_tools instance-attribute

cache_tools: bool = cache_tools

Whether to cache the list of tools.

When enabled (default), tools are fetched once and cached until either: - The server sends a notifications/tools/list_changed notification - [MCPServer.__aexit__][pydantic_ai.mcp.MCPServer.__aexit__] is called (when the last context exits)

Set to False for servers that change tools dynamically without sending notifications.

Note: When using durable execution (Temporal, DBOS), tool definitions are additionally cached at the wrapper level across activities/steps, to avoid redundant MCP connections. This wrapper-level cache is not invalidated by tools/list_changed notifications. Set to False to disable all caching if tools may change during a workflow.

cache_resources instance-attribute

cache_resources: bool = cache_resources

Whether to cache the list of resources.

When enabled (default), resources are fetched once and cached until either: - The server sends a notifications/resources/list_changed notification - [MCPServer.__aexit__][pydantic_ai.mcp.MCPServer.__aexit__] is called (when the last context exits)

Set to False for servers that change resources dynamically without sending notifications.

include_instructions instance-attribute

include_instructions: bool = include_instructions

Whether to include the server's instructions in the agent's instructions.

Defaults to False for backward compatibility.

include_return_schema instance-attribute

include_return_schema: bool | None = include_return_schema

Whether to include return schemas in tool definitions sent to the model.

When None (default), defaults to False unless the IncludeToolReturnSchemas capability is used.

client_streams abstractmethod async

client_streams() -> AsyncIterator[
    tuple[
        MemoryObjectReceiveStream[
            SessionMessage | Exception
        ],
        MemoryObjectSendStream[SessionMessage],
    ]
]

Create the streams for the MCP server.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
559
560
561
562
563
564
565
566
567
568
569
570
571
@abstractmethod
@asynccontextmanager
async def client_streams(
    self,
) -> AsyncIterator[
    tuple[
        MemoryObjectReceiveStream[SessionMessage | Exception],
        MemoryObjectSendStream[SessionMessage],
    ]
]:
    """Create the streams for the MCP server."""
    raise NotImplementedError('MCP Server subclasses must implement this method.')
    yield

server_info property

server_info: Implementation

Access the information send by the MCP server during initialization.

capabilities property

capabilities: ServerCapabilities

Access the capabilities advertised by the MCP server during initialization.

instructions property

instructions: str | None

Access the instructions sent by the MCP server during initialization.

get_instructions async

get_instructions(
    ctx: RunContext[Any],
) -> InstructionPart | None

Return the MCP server's instructions for how to use its tools.

If include_instructions is True, returns the instructions sent by the MCP server during initialization. Otherwise, returns None.

Instructions from external servers are marked as dynamic since they may change between connections.

Parameters:

Name Type Description Default
ctx RunContext[Any]

The run context for this agent run.

required

Returns:

Type Description
InstructionPart | None

An InstructionPart with the server's instructions if include_instructions is enabled, otherwise None.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
async def get_instructions(self, ctx: RunContext[Any]) -> messages.InstructionPart | None:
    """Return the MCP server's instructions for how to use its tools.

    If [`include_instructions`][pydantic_ai.mcp.MCPServer.include_instructions] is `True`, returns
    the [`instructions`][pydantic_ai.mcp.MCPServer.instructions] sent by the MCP server during
    initialization. Otherwise, returns `None`.

    Instructions from external servers are marked as dynamic since they may change between connections.

    Args:
        ctx: The run context for this agent run.

    Returns:
        An `InstructionPart` with the server's instructions if `include_instructions` is enabled, otherwise `None`.
    """
    if not self.include_instructions:
        return None
    try:
        instr = self.instructions
    except AttributeError:
        # Server not yet initialized — return None rather than propagating.
        # Durable execution wrappers detect this and fetch via activity/step.
        return None
    return messages.InstructionPart(content=instr, dynamic=True) if instr is not None else None

list_tools async

list_tools() -> list[Tool]

Retrieve tools that are currently active on the server.

Tools are cached by default, with cache invalidation on: - notifications/tools/list_changed notifications from the server - __aexit__ when the last context exits

Set cache_tools=False for servers that change tools without sending notifications.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
async def list_tools(self) -> list[mcp_types.Tool]:
    """Retrieve tools that are currently active on the server.

    Tools are cached by default, with cache invalidation on:
    - `notifications/tools/list_changed` notifications from the server
    - `__aexit__` when the last context exits

    Set `cache_tools=False` for servers that change tools without sending notifications.
    """
    if self.cache_tools and self._cached_tools is not None:
        return self._cached_tools

    async with self:
        result = await self._get_client().list_tools()
        if self.cache_tools:
            self._cached_tools = result.tools
        return result.tools

direct_call_tool async

direct_call_tool(
    name: str,
    args: dict[str, Any],
    metadata: dict[str, Any] | None = None,
) -> ToolResult

Call a tool on the server.

Parameters:

Name Type Description Default
name str

The name of the tool to call.

required
args dict[str, Any]

The arguments to pass to the tool.

required
metadata dict[str, Any] | None

Request-level metadata (optional)

None

Returns:

Type Description
ToolResult

The result of the tool call.

Raises:

Type Description
ModelRetry

If the tool call fails.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
async def direct_call_tool(
    self,
    name: str,
    args: dict[str, Any],
    metadata: dict[str, Any] | None = None,
) -> ToolResult:
    """Call a tool on the server.

    Args:
        name: The name of the tool to call.
        args: The arguments to pass to the tool.
        metadata: Request-level metadata (optional)

    Returns:
        The result of the tool call.

    Raises:
        ModelRetry: If the tool call fails.
    """
    async with self:  # Ensure server is running
        try:
            result = await self._get_client().send_request(
                mcp_types.ClientRequest(
                    mcp_types.CallToolRequest(
                        method='tools/call',
                        params=mcp_types.CallToolRequestParams(
                            name=name,
                            arguments=args,
                            _meta=mcp_types.RequestParams.Meta(**metadata) if metadata else None,
                        ),
                    )
                ),
                mcp_types.CallToolResult,
            )
        except mcp_exceptions.McpError as e:
            raise exceptions.ModelRetry(e.error.message)

    if result.isError:
        message: str | None = None
        if result.content:  # pragma: no branch
            text_parts = [part.text for part in result.content if isinstance(part, mcp_types.TextContent)]
            message = '\n'.join(text_parts)

        raise exceptions.ModelRetry(message or 'MCP tool call failed')

    # Prefer structured content if there are only text parts, which per the docs would contain the JSON-encoded structured content for backward compatibility.
    # See https://github.com/modelcontextprotocol/python-sdk#structured-output
    if (structured := result.structuredContent) and not any(
        not isinstance(part, mcp_types.TextContent) for part in result.content
    ):
        # The MCP SDK wraps primitives and generic types like list in a `result` key, but we want to use the raw value returned by the tool function.
        # See https://github.com/modelcontextprotocol/python-sdk#structured-output
        if isinstance(structured, dict) and len(structured) == 1 and 'result' in structured:
            return structured['result']
        return structured

    mapped = [await self._map_tool_result_part(part) for part in result.content]
    return mapped[0] if len(mapped) == 1 else mapped

list_resources async

list_resources() -> list[Resource]

Retrieve resources that are currently present on the server.

Resources are cached by default, with cache invalidation on: - notifications/resources/list_changed notifications from the server - __aexit__ when the last context exits

Set cache_resources=False for servers that change resources without sending notifications.

Raises:

Type Description
MCPError

If the server returns an error.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
async def list_resources(self) -> list[Resource]:
    """Retrieve resources that are currently present on the server.

    Resources are cached by default, with cache invalidation on:
    - `notifications/resources/list_changed` notifications from the server
    - `__aexit__` when the last context exits

    Set `cache_resources=False` for servers that change resources without sending notifications.

    Raises:
        MCPError: If the server returns an error.
    """
    if self.cache_resources and self._cached_resources is not None:
        return self._cached_resources

    async with self:
        if not self.capabilities.resources:
            return []
        try:
            result = await self._get_client().list_resources()
            resources = [Resource.from_mcp_sdk(r) for r in result.resources]
            if self.cache_resources:
                self._cached_resources = resources
            return resources
        except mcp_exceptions.McpError as e:
            raise MCPError.from_mcp_sdk(e) from e

list_resource_templates async

list_resource_templates() -> list[ResourceTemplate]

Retrieve resource templates that are currently present on the server.

Raises:

Type Description
MCPError

If the server returns an error.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
792
793
794
795
796
797
798
799
800
801
802
803
804
805
async def list_resource_templates(self) -> list[ResourceTemplate]:
    """Retrieve resource templates that are currently present on the server.

    Raises:
        MCPError: If the server returns an error.
    """
    async with self:  # Ensure server is running
        if not self.capabilities.resources:
            return []
        try:
            result = await self._get_client().list_resource_templates()
        except mcp_exceptions.McpError as e:
            raise MCPError.from_mcp_sdk(e) from e
    return [ResourceTemplate.from_mcp_sdk(t) for t in result.resourceTemplates]

read_resource async

read_resource(
    uri: str,
) -> str | BinaryContent | list[str | BinaryContent]
read_resource(
    uri: Resource,
) -> str | BinaryContent | list[str | BinaryContent]
read_resource(
    uri: str | Resource,
) -> str | BinaryContent | list[str | BinaryContent]

Read the contents of a specific resource by URI.

Parameters:

Name Type Description Default
uri str | Resource

The URI of the resource to read, or a Resource object.

required

Returns:

Type Description
str | BinaryContent | list[str | BinaryContent]

The resource contents. If the resource has a single content item, returns that item directly.

str | BinaryContent | list[str | BinaryContent]

If the resource has multiple content items, returns a list of items.

Raises:

Type Description
MCPError

If the server returns an error.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
async def read_resource(
    self, uri: str | Resource
) -> str | messages.BinaryContent | list[str | messages.BinaryContent]:
    """Read the contents of a specific resource by URI.

    Args:
        uri: The URI of the resource to read, or a Resource object.

    Returns:
        The resource contents. If the resource has a single content item, returns that item directly.
        If the resource has multiple content items, returns a list of items.

    Raises:
        MCPError: If the server returns an error.
    """
    resource_uri = uri if isinstance(uri, str) else uri.uri
    async with self:  # Ensure server is running
        try:
            result = await self._get_client().read_resource(AnyUrl(resource_uri))
        except mcp_exceptions.McpError as e:
            raise MCPError.from_mcp_sdk(e) from e

    return (
        self._get_content(result.contents[0])
        if len(result.contents) == 1
        else [self._get_content(resource) for resource in result.contents]
    )

__aenter__ async

__aenter__() -> Self

Enter the MCP server context.

The first call starts the connection (spawning a subprocess for stdio servers, opening an HTTP connection for HTTP servers). Subsequent calls — from any task — share the same connection via reference counting. The connection is torn down when the last async with scope exits.

Because the session runs in a dedicated background task, entering and exiting from different tasks (e.g. asyncio.gather children, fasta2a workers, or graph node tasks) is safe: the underlying transport's cancel scopes never cross task boundaries.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
async def __aenter__(self) -> Self:
    """Enter the MCP server context.

    The first call starts the connection (spawning a subprocess for stdio servers,
    opening an HTTP connection for HTTP servers). Subsequent calls — from any task
    — share the same connection via reference counting. The connection is torn
    down when the last `async with` scope exits.

    Because the session runs in a dedicated background task, entering and exiting
    from different tasks (e.g. `asyncio.gather` children, fasta2a workers, or
    graph node tasks) is safe: the underlying transport's cancel scopes never
    cross task boundaries.
    """
    async with self._enter_lock:
        state = self._session_state
        need_to_start = state.session_task is None or state.session_task.done()
        if need_to_start:
            state.stop_event = anyio.Event()
            state.ready_event = anyio.Event()
            state.connect_error = None
            state.client = None
            state.session_task = asyncio.create_task(self._session_runner())
            try:
                await state.ready_event.wait()
            except BaseException:
                # Cancelled while waiting for startup: tear down the session task
                # without impacting anyone else (we hold the lock and just started it)
                task = state.session_task
                state.stop_event.set()
                await state.force_close(task)
                state.session_task = None
                state.client = None
                raise
            if state.connect_error is not None:
                # Connection failed during startup; surface the error and reset state
                state.session_task = None
                err = state.connect_error
                state.connect_error = None
                raise err
        state.nesting_counter += 1
    return self

is_running property

is_running: bool

Check if the MCP server is running.

MCPServerStdio deprecated

Bases: MCPServer

Deprecated

MCPServerStdio is deprecated and will be removed in v2. Use MCPToolset('path/to/script.py') for Python scripts, MCPToolset('script.js') for Node scripts, or MCPToolset(fastmcp.client.transports.StdioTransport(command='...', args=[...])) for arbitrary commands.

Runs an MCP server in a subprocess and communicates with it over stdin/stdout.

This class implements the stdio transport from the MCP specification. See https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio for more information.

Note

Using this class as an async context manager will start the server as a subprocess when entering the context, and stop it when exiting the context.

Example:

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStdio

server = MCPServerStdio(  # (1)!
    'uv', args=['run', 'mcp-run-python', 'stdio'], timeout=10
)
agent = Agent('openai:gpt-5.2', toolsets=[server])

  1. See MCP Run Python for more information.
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
@deprecated(
    '`MCPServerStdio` is deprecated and will be removed in v2. '
    "Use `MCPToolset('path/to/script.py')` for Python scripts, `MCPToolset('script.js')` for Node "
    "scripts, or `MCPToolset(fastmcp.client.transports.StdioTransport(command='...', args=[...]))` "
    'for arbitrary commands.'
)
class MCPServerStdio(MCPServer):
    """Runs an MCP server in a subprocess and communicates with it over stdin/stdout.

    This class implements the stdio transport from the MCP specification.
    See <https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio> for more information.

    !!! note
        Using this class as an async context manager will start the server as a subprocess when entering the context,
        and stop it when exiting the context.

    Example:
    ```python {py="3.10"}
    from pydantic_ai import Agent
    from pydantic_ai.mcp import MCPServerStdio

    server = MCPServerStdio(  # (1)!
        'uv', args=['run', 'mcp-run-python', 'stdio'], timeout=10
    )
    agent = Agent('openai:gpt-5.2', toolsets=[server])
    ```

    1. See [MCP Run Python](https://github.com/pydantic/mcp-run-python) for more information.
    """

    command: str
    """The command to run."""

    args: Sequence[str]
    """The arguments to pass to the command."""

    env: dict[str, str] | None
    """The environment variables the CLI server will have access to.

    By default the subprocess will not inherit any environment variables from the parent process.
    If you want to inherit the environment variables from the parent process, use `env=os.environ`.
    """

    cwd: str | Path | None
    """The working directory to use when spawning the process."""

    # last fields are re-defined from the parent class so they appear as fields
    tool_prefix: str | None
    log_level: mcp_types.LoggingLevel | None
    log_handler: LoggingFnT | None
    timeout: float
    read_timeout: float
    process_tool_call: ProcessToolCallback | None
    allow_sampling: bool
    sampling_model: models.Model | None
    max_retries: int
    elicitation_callback: ElicitationFnT | None = None
    cache_tools: bool
    cache_resources: bool
    include_instructions: bool

    def __init__(
        self,
        command: str,
        args: Sequence[str],
        *,
        env: dict[str, str] | None = None,
        cwd: str | Path | None = None,
        tool_prefix: str | None = None,
        log_level: mcp_types.LoggingLevel | None = None,
        log_handler: LoggingFnT | None = None,
        timeout: float = 5,
        read_timeout: float = 5 * 60,
        process_tool_call: ProcessToolCallback | None = None,
        allow_sampling: bool = True,
        sampling_model: models.Model | None = None,
        max_retries: int = 1,
        elicitation_callback: ElicitationFnT | None = None,
        cache_tools: bool = True,
        cache_resources: bool = True,
        include_instructions: bool = False,
        include_return_schema: bool | None = None,
        id: str | None = None,
        client_info: mcp_types.Implementation | None = None,
    ):
        """Build a new MCP server.

        Args:
            command: The command to run.
            args: The arguments to pass to the command.
            env: The environment variables to set in the subprocess.
            cwd: The working directory to use when spawning the process.
            tool_prefix: A prefix to add to all tools that are registered with the server.
            log_level: The log level to set when connecting to the server, if any.
            log_handler: A handler for logging messages from the server.
            timeout: The timeout in seconds to wait for the client to initialize.
            read_timeout: Maximum time in seconds to wait for new messages before timing out.
            process_tool_call: Hook to customize tool calling and optionally pass extra metadata.
            allow_sampling: Whether to allow MCP sampling through this client.
            sampling_model: The model to use for sampling.
            max_retries: The maximum number of times to retry a tool call.
            elicitation_callback: Callback function to handle elicitation requests from the server.
            cache_tools: Whether to cache the list of tools.
                See [`MCPServer.cache_tools`][pydantic_ai.mcp.MCPServer.cache_tools].
            cache_resources: Whether to cache the list of resources.
                See [`MCPServer.cache_resources`][pydantic_ai.mcp.MCPServer.cache_resources].
            include_instructions: Whether to include the server's instructions in the agent's instructions.
                See [`MCPServer.include_instructions`][pydantic_ai.mcp.MCPServer.include_instructions].
            include_return_schema: Whether to include return schemas in tool definitions.
                See [`MCPServer.include_return_schema`][pydantic_ai.mcp.MCPServer.include_return_schema].
            id: An optional unique ID for the MCP server. An MCP server needs to have an ID in order to be used in a durable execution environment like Temporal, in which case the ID will be used to identify the server's activities within the workflow.
            client_info: Information describing the MCP client implementation.
        """
        self.command = command
        self.args = args
        self.env = env
        self.cwd = cwd

        super().__init__(
            tool_prefix,
            log_level,
            log_handler,
            timeout,
            read_timeout,
            process_tool_call,
            allow_sampling,
            sampling_model,
            max_retries,
            elicitation_callback,
            cache_tools,
            cache_resources,
            id=id,
            include_instructions=include_instructions,
            include_return_schema=include_return_schema,
            client_info=client_info,
        )

    @classmethod
    def __get_pydantic_core_schema__(cls, _: Any, __: Any) -> CoreSchema:
        return core_schema.no_info_after_validator_function(
            lambda dct: MCPServerStdio(**dct),  # pyright: ignore[reportDeprecated]
            core_schema.typed_dict_schema(
                {
                    'command': core_schema.typed_dict_field(core_schema.str_schema()),
                    'args': core_schema.typed_dict_field(core_schema.list_schema(core_schema.str_schema())),
                    'env': core_schema.typed_dict_field(
                        core_schema.dict_schema(core_schema.str_schema(), core_schema.str_schema()),
                        required=False,
                    ),
                }
            ),
        )

    @asynccontextmanager
    async def client_streams(
        self,
    ) -> AsyncIterator[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
        ]
    ]:
        server = StdioServerParameters(command=self.command, args=list(self.args), env=self.env, cwd=self.cwd)
        async with stdio_client(server=server) as (read_stream, write_stream):
            yield read_stream, write_stream

    def __repr__(self) -> str:
        repr_args = [
            f'command={self.command!r}',
            f'args={self.args!r}',
        ]
        if self.id:
            repr_args.append(f'id={self.id!r}')  # pragma: lax no cover
        return f'{self.__class__.__name__}({", ".join(repr_args)})'

    def __eq__(self, value: object, /) -> bool:
        return (
            super().__eq__(value)
            and isinstance(value, MCPServerStdio)  # pyright: ignore[reportDeprecated]
            and self.command == value.command
            and self.args == value.args
            and self.env == value.env
            and self.cwd == value.cwd
        )

__init__

__init__(
    command: str,
    args: Sequence[str],
    *,
    env: dict[str, str] | None = None,
    cwd: str | Path | None = None,
    tool_prefix: str | None = None,
    log_level: LoggingLevel | None = None,
    log_handler: LoggingFnT | None = None,
    timeout: float = 5,
    read_timeout: float = 5 * 60,
    process_tool_call: ProcessToolCallback | None = None,
    allow_sampling: bool = True,
    sampling_model: Model | None = None,
    max_retries: int = 1,
    elicitation_callback: ElicitationFnT | None = None,
    cache_tools: bool = True,
    cache_resources: bool = True,
    include_instructions: bool = False,
    include_return_schema: bool | None = None,
    id: str | None = None,
    client_info: Implementation | None = None
)

Build a new MCP server.

Parameters:

Name Type Description Default
command str

The command to run.

required
args Sequence[str]

The arguments to pass to the command.

required
env dict[str, str] | None

The environment variables to set in the subprocess.

None
cwd str | Path | None

The working directory to use when spawning the process.

None
tool_prefix str | None

A prefix to add to all tools that are registered with the server.

None
log_level LoggingLevel | None

The log level to set when connecting to the server, if any.

None
log_handler LoggingFnT | None

A handler for logging messages from the server.

None
timeout float

The timeout in seconds to wait for the client to initialize.

5
read_timeout float

Maximum time in seconds to wait for new messages before timing out.

5 * 60
process_tool_call ProcessToolCallback | None

Hook to customize tool calling and optionally pass extra metadata.

None
allow_sampling bool

Whether to allow MCP sampling through this client.

True
sampling_model Model | None

The model to use for sampling.

None
max_retries int

The maximum number of times to retry a tool call.

1
elicitation_callback ElicitationFnT | None

Callback function to handle elicitation requests from the server.

None
cache_tools bool

Whether to cache the list of tools. See MCPServer.cache_tools.

True
cache_resources bool

Whether to cache the list of resources. See MCPServer.cache_resources.

True
include_instructions bool

Whether to include the server's instructions in the agent's instructions. See MCPServer.include_instructions.

False
include_return_schema bool | None

Whether to include return schemas in tool definitions. See MCPServer.include_return_schema.

None
id str | None

An optional unique ID for the MCP server. An MCP server needs to have an ID in order to be used in a durable execution environment like Temporal, in which case the ID will be used to identify the server's activities within the workflow.

None
client_info Implementation | None

Information describing the MCP client implementation.

None
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
def __init__(
    self,
    command: str,
    args: Sequence[str],
    *,
    env: dict[str, str] | None = None,
    cwd: str | Path | None = None,
    tool_prefix: str | None = None,
    log_level: mcp_types.LoggingLevel | None = None,
    log_handler: LoggingFnT | None = None,
    timeout: float = 5,
    read_timeout: float = 5 * 60,
    process_tool_call: ProcessToolCallback | None = None,
    allow_sampling: bool = True,
    sampling_model: models.Model | None = None,
    max_retries: int = 1,
    elicitation_callback: ElicitationFnT | None = None,
    cache_tools: bool = True,
    cache_resources: bool = True,
    include_instructions: bool = False,
    include_return_schema: bool | None = None,
    id: str | None = None,
    client_info: mcp_types.Implementation | None = None,
):
    """Build a new MCP server.

    Args:
        command: The command to run.
        args: The arguments to pass to the command.
        env: The environment variables to set in the subprocess.
        cwd: The working directory to use when spawning the process.
        tool_prefix: A prefix to add to all tools that are registered with the server.
        log_level: The log level to set when connecting to the server, if any.
        log_handler: A handler for logging messages from the server.
        timeout: The timeout in seconds to wait for the client to initialize.
        read_timeout: Maximum time in seconds to wait for new messages before timing out.
        process_tool_call: Hook to customize tool calling and optionally pass extra metadata.
        allow_sampling: Whether to allow MCP sampling through this client.
        sampling_model: The model to use for sampling.
        max_retries: The maximum number of times to retry a tool call.
        elicitation_callback: Callback function to handle elicitation requests from the server.
        cache_tools: Whether to cache the list of tools.
            See [`MCPServer.cache_tools`][pydantic_ai.mcp.MCPServer.cache_tools].
        cache_resources: Whether to cache the list of resources.
            See [`MCPServer.cache_resources`][pydantic_ai.mcp.MCPServer.cache_resources].
        include_instructions: Whether to include the server's instructions in the agent's instructions.
            See [`MCPServer.include_instructions`][pydantic_ai.mcp.MCPServer.include_instructions].
        include_return_schema: Whether to include return schemas in tool definitions.
            See [`MCPServer.include_return_schema`][pydantic_ai.mcp.MCPServer.include_return_schema].
        id: An optional unique ID for the MCP server. An MCP server needs to have an ID in order to be used in a durable execution environment like Temporal, in which case the ID will be used to identify the server's activities within the workflow.
        client_info: Information describing the MCP client implementation.
    """
    self.command = command
    self.args = args
    self.env = env
    self.cwd = cwd

    super().__init__(
        tool_prefix,
        log_level,
        log_handler,
        timeout,
        read_timeout,
        process_tool_call,
        allow_sampling,
        sampling_model,
        max_retries,
        elicitation_callback,
        cache_tools,
        cache_resources,
        id=id,
        include_instructions=include_instructions,
        include_return_schema=include_return_schema,
        client_info=client_info,
    )

command instance-attribute

command: str = command

The command to run.

args instance-attribute

args: Sequence[str] = args

The arguments to pass to the command.

env instance-attribute

env: dict[str, str] | None = env

The environment variables the CLI server will have access to.

By default the subprocess will not inherit any environment variables from the parent process. If you want to inherit the environment variables from the parent process, use env=os.environ.

cwd instance-attribute

cwd: str | Path | None = cwd

The working directory to use when spawning the process.

MCPServerSSE deprecated

Bases: _MCPServerHTTP

Deprecated

MCPServerSSE is deprecated and will be removed in v2. Use MCPToolset('http://.../sse') instead — the SSE transport is automatically inferred from URLs ending in /sse.

An MCP server that connects over streamable HTTP connections.

This class implements the SSE transport from the MCP specification. See https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse for more information.

Note

Using this class as an async context manager will create a new pool of HTTP connections to connect to a server which should already be running.

Example:

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerSSE

server = MCPServerSSE('http://localhost:3001/sse')
agent = Agent('openai:gpt-5.2', toolsets=[server])

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
@deprecated(
    '`MCPServerSSE` is deprecated and will be removed in v2. '
    "Use `MCPToolset('http://.../sse')` instead — the SSE transport is automatically inferred "
    'from URLs ending in `/sse`.'
)
class MCPServerSSE(_MCPServerHTTP):
    """An MCP server that connects over streamable HTTP connections.

    This class implements the SSE transport from the MCP specification.
    See <https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse> for more information.

    !!! note
        Using this class as an async context manager will create a new pool of HTTP connections to connect
        to a server which should already be running.

    Example:
    ```python {py="3.10"}
    from pydantic_ai import Agent
    from pydantic_ai.mcp import MCPServerSSE

    server = MCPServerSSE('http://localhost:3001/sse')
    agent = Agent('openai:gpt-5.2', toolsets=[server])
    ```
    """

    @classmethod
    def __get_pydantic_core_schema__(cls, _: Any, __: Any) -> CoreSchema:
        return core_schema.no_info_after_validator_function(
            lambda dct: MCPServerSSE(**dct),  # pyright: ignore[reportDeprecated]
            core_schema.typed_dict_schema(
                {
                    'url': core_schema.typed_dict_field(core_schema.str_schema()),
                    'headers': core_schema.typed_dict_field(
                        core_schema.dict_schema(core_schema.str_schema(), core_schema.str_schema()), required=False
                    ),
                }
            ),
        )

    # sse_client has a hang bug (https://github.com/modelcontextprotocol/python-sdk/issues/1811)
    # that prevents testing SSE transport in CI.
    # TODO: Remove pragma and add a test
    # once https://github.com/modelcontextprotocol/python-sdk/pull/1838 is released.
    @asynccontextmanager
    async def client_streams(  # pragma: no cover
        self,
    ) -> AsyncIterator[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
        ]
    ]:
        if self.http_client and self.headers:
            raise ValueError('`http_client` is mutually exclusive with `headers`.')

        if self.http_client is not None:

            def httpx_client_factory(
                headers: dict[str, str] | None = None,
                timeout: httpx.Timeout | None = None,
                auth: httpx.Auth | None = None,
            ) -> httpx.AsyncClient:
                assert self.http_client is not None
                return self.http_client

            async with sse_client(
                url=self.url,
                timeout=self.timeout,
                sse_read_timeout=self.read_timeout,
                httpx_client_factory=httpx_client_factory,
            ) as (read_stream, write_stream, *_):
                yield read_stream, write_stream
        else:
            async with sse_client(
                url=self.url,
                timeout=self.timeout,
                sse_read_timeout=self.read_timeout,
                headers=self.headers,
            ) as (read_stream, write_stream, *_):
                yield read_stream, write_stream

    def __eq__(self, value: object, /) -> bool:
        return super().__eq__(value) and isinstance(value, MCPServerSSE) and self.url == value.url  # pyright: ignore[reportDeprecated]

MCPServerHTTP deprecated

Bases: MCPServerSSE

Deprecated

The MCPServerHTTP class is deprecated, use MCPServerSSE instead.

An MCP server that connects over HTTP using the old SSE transport.

This class implements the SSE transport from the MCP specification. See https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse for more information.

Note

Using this class as an async context manager will create a new pool of HTTP connections to connect to a server which should already be running.

Example:

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerHTTP

server = MCPServerHTTP('http://localhost:3001/sse')
agent = Agent('openai:gpt-5.2', toolsets=[server])

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
@deprecated('The `MCPServerHTTP` class is deprecated, use `MCPServerSSE` instead.')
class MCPServerHTTP(MCPServerSSE):  # pyright: ignore[reportDeprecated]
    """An MCP server that connects over HTTP using the old SSE transport.

    This class implements the SSE transport from the MCP specification.
    See <https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse> for more information.

    !!! note
        Using this class as an async context manager will create a new pool of HTTP connections to connect
        to a server which should already be running.

    Example:
    ```python {py="3.10" test="skip"}
    from pydantic_ai import Agent
    from pydantic_ai.mcp import MCPServerHTTP

    server = MCPServerHTTP('http://localhost:3001/sse')
    agent = Agent('openai:gpt-5.2', toolsets=[server])
    ```
    """

MCPServerStreamableHTTP deprecated

Bases: _MCPServerHTTP

Deprecated

MCPServerStreamableHTTP is deprecated and will be removed in v2. Use MCPToolset('http://.../mcp') instead — Streamable HTTP is the default for HTTP URLs.

An MCP server that connects over HTTP using the Streamable HTTP transport.

This class implements the Streamable HTTP transport from the MCP specification. See https://modelcontextprotocol.io/introduction#streamable-http for more information.

Note

Using this class as an async context manager will create a new pool of HTTP connections to connect to a server which should already be running.

Example:

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStreamableHTTP

server = MCPServerStreamableHTTP('http://localhost:8000/mcp')
agent = Agent('openai:gpt-5.2', toolsets=[server])

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
@deprecated(
    '`MCPServerStreamableHTTP` is deprecated and will be removed in v2. '
    "Use `MCPToolset('http://.../mcp')` instead — Streamable HTTP is the default for HTTP URLs."
)
class MCPServerStreamableHTTP(_MCPServerHTTP):
    """An MCP server that connects over HTTP using the Streamable HTTP transport.

    This class implements the Streamable HTTP transport from the MCP specification.
    See <https://modelcontextprotocol.io/introduction#streamable-http> for more information.

    !!! note
        Using this class as an async context manager will create a new pool of HTTP connections to connect
        to a server which should already be running.

    Example:
    ```python {py="3.10"}
    from pydantic_ai import Agent
    from pydantic_ai.mcp import MCPServerStreamableHTTP

    server = MCPServerStreamableHTTP('http://localhost:8000/mcp')
    agent = Agent('openai:gpt-5.2', toolsets=[server])
    ```
    """

    @classmethod
    def __get_pydantic_core_schema__(cls, _: Any, __: Any) -> CoreSchema:
        return core_schema.no_info_after_validator_function(
            lambda dct: MCPServerStreamableHTTP(**dct),  # pyright: ignore[reportDeprecated]
            core_schema.typed_dict_schema(
                {
                    'url': core_schema.typed_dict_field(core_schema.str_schema()),
                    'headers': core_schema.typed_dict_field(
                        core_schema.dict_schema(core_schema.str_schema(), core_schema.str_schema()), required=False
                    ),
                }
            ),
        )

    @asynccontextmanager
    async def client_streams(
        self,
    ) -> AsyncIterator[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
        ]
    ]:
        if self.http_client and self.headers:
            raise ValueError('`http_client` is mutually exclusive with `headers`.')

        aexit_stack = AsyncExitStack()
        http_client = self.http_client or await aexit_stack.enter_async_context(
            httpx.AsyncClient(timeout=httpx.Timeout(self.timeout, read=self.read_timeout), headers=self.headers)
        )
        read_stream, write_stream, *_ = await aexit_stack.enter_async_context(
            streamable_http_client(self.url, http_client=http_client)
        )
        try:
            yield read_stream, write_stream
        finally:
            await aexit_stack.aclose()

    def __eq__(self, value: object, /) -> bool:
        return super().__eq__(value) and isinstance(value, MCPServerStreamableHTTP) and self.url == value.url  # pyright: ignore[reportDeprecated]

ToolResult module-attribute

ToolResult = (
    str
    | BinaryContent
    | dict[str, Any]
    | list[Any]
    | Sequence[
        str | BinaryContent | dict[str, Any] | list[Any]
    ]
)

The result type of an MCP tool call.

CallToolFunc

Bases: Protocol

A callable that invokes an MCP tool — typically MCPToolset.direct_call_tool or its legacy equivalent.

Passed to user-defined ProcessToolCallback functions as the underlying call hook. metadata is keyword-only — pass it as await call_tool(name, args, metadata=...).

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
class CallToolFunc(Protocol):
    """A callable that invokes an MCP tool — typically `MCPToolset.direct_call_tool` or its legacy equivalent.

    Passed to user-defined [`ProcessToolCallback`][pydantic_ai.mcp.ProcessToolCallback] functions as
    the underlying call hook. `metadata` is keyword-only — pass it as
    `await call_tool(name, args, metadata=...)`.
    """

    async def __call__(
        self,
        name: str,
        args: dict[str, Any],
        *,
        metadata: dict[str, Any] | None = None,
    ) -> ToolResult: ...

ProcessToolCallback module-attribute

ProcessToolCallback = Callable[
    [RunContext[Any], CallToolFunc, str, dict[str, Any]],
    Awaitable[ToolResult],
]

A process tool callback.

It accepts a run context, the original tool call function, a tool name, and arguments.

Allows wrapping an MCP server tool call to customize it, including adding extra request metadata.

MCPToolsetClient module-attribute

MCPToolsetClient: TypeAlias = (
    "FastMCPClient[Any] | ClientTransport | FastMCP | FastMCP1Server | AnyUrl | Path | str"
)

Anything MCPToolset accepts as its client argument — a pre-built fastmcp.Client, a FastMCP ClientTransport, an in-process FastMCP server, an AnyUrl/URL string, a script Path, or a URL/path/script string.

For multi-server JSON config files, use load_mcp_toolsets instead — it expands env vars and constructs one MCPToolset per server entry.

MCPToolset dataclass

Bases: AbstractToolset[AgentDepsT]

A toolset for connecting to an MCP server.

MCPToolset is the recommended way to use Model Context Protocol servers in Pydantic AI. It is built on the FastMCP Client, which supports the full MCP protocol — tools, resources, sampling, elicitation, OAuth — and a wide range of transports (HTTP, SSE, stdio, in-process FastMCP servers, multi-server configs).

Pass any input that FastMCP can build a transport from — a URL, a script path, a FastMCP server instance for in-process testing — or a pre-built fastmcp.Client for full control over its configuration. For multi-server JSON config files, use load_mcp_toolsets instead.

Example — connect to a streamable-HTTP MCP server:

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPToolset

toolset = MCPToolset('http://localhost:8000/mcp')
agent = Agent('openai:gpt-5', toolsets=[toolset])

Example — connect to a local stdio MCP server:

from pydantic_ai.mcp import MCPToolset

toolset = MCPToolset('my_mcp_server.py')

Example — pass a pre-built FastMCP Client for full configuration control:

from fastmcp.client import Client
from fastmcp.client.transports import StreamableHttpTransport

from pydantic_ai.mcp import MCPToolset

client = Client(StreamableHttpTransport('http://localhost:8000/mcp'), auth='oauth')
toolset = MCPToolset(client)
Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
@dataclass(init=False, repr=False)
class MCPToolset(AbstractToolset[AgentDepsT]):
    """A toolset for connecting to an MCP server.

    `MCPToolset` is the recommended way to use [Model Context Protocol](https://modelcontextprotocol.io)
    servers in Pydantic AI. It is built on the [FastMCP](https://gofastmcp.com) `Client`, which
    supports the full MCP protocol — tools, resources, sampling, elicitation, OAuth — and a wide
    range of transports (HTTP, SSE, stdio, in-process FastMCP servers, multi-server configs).

    Pass any input that FastMCP can build a transport from — a URL, a script path, a `FastMCP`
    server instance for in-process testing — or a pre-built `fastmcp.Client` for full control over
    its configuration. For multi-server JSON config files, use
    [`load_mcp_toolsets`][pydantic_ai.mcp.load_mcp_toolsets] instead.

    Example — connect to a streamable-HTTP MCP server:

    ```python {test="skip"}
    from pydantic_ai import Agent
    from pydantic_ai.mcp import MCPToolset

    toolset = MCPToolset('http://localhost:8000/mcp')
    agent = Agent('openai:gpt-5', toolsets=[toolset])
    ```

    Example — connect to a local stdio MCP server:

    ```python {test="skip"}
    from pydantic_ai.mcp import MCPToolset

    toolset = MCPToolset('my_mcp_server.py')
    ```

    Example — pass a pre-built FastMCP Client for full configuration control:

    ```python {test="skip"}
    from fastmcp.client import Client
    from fastmcp.client.transports import StreamableHttpTransport

    from pydantic_ai.mcp import MCPToolset

    client = Client(StreamableHttpTransport('http://localhost:8000/mcp'), auth='oauth')
    toolset = MCPToolset(client)
    ```
    """

    client: FastMCPClient[Any]
    """The underlying FastMCP `Client`. Always normalized to a `fastmcp.Client` regardless of how
    the toolset was constructed."""

    tool_error_behavior: Literal['retry', 'error']
    """How to handle tool errors raised by the server.

    `'retry'` (default) raises [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] so the model can
    self-correct; `'error'` propagates the underlying `fastmcp.exceptions.ToolError` to the caller.
    """

    max_retries: int | None
    """Maximum number of times a tool call may be retried after a `ModelRetry`.

    `None` (default) inherits the agent's retry count at runtime. Set explicitly to override.
    """

    cache_tools: bool
    """Whether to cache the list of tools across `get_tools()` calls.

    When enabled (default), tools are fetched once and cached until either:

    - The server sends a `notifications/tools/list_changed` notification
    - The toolset is fully exited (last `__aexit__` matches the first `__aenter__`)

    Set to `False` for servers that change tools dynamically without sending notifications, or when
    passing a pre-built FastMCP Client (the cache-invalidation message handler isn't installed in
    that case, so caches are only invalidated by session close).
    """

    cache_resources: bool
    """Whether to cache the list of resources across `list_resources()` calls.

    Same semantics as [`cache_tools`][pydantic_ai.mcp.MCPToolset.cache_tools] but for
    `notifications/resources/list_changed` notifications.
    """

    include_instructions: bool
    """Whether to include the server's `initialize` instructions string in the agent's instruction set.

    Defaults to `False` for backward compatibility. When `True`, the instructions returned by the
    server during initialization are added to the agent's instructions.
    """

    include_return_schema: bool | None
    """Whether to include each tool's `outputSchema` in the schema sent to the model.

    When `None` (the default), defaults to `False` unless the
    [`IncludeToolReturnSchemas`][pydantic_ai.capabilities.IncludeToolReturnSchemas] capability is
    used.
    """

    process_tool_call: ProcessToolCallback | None
    """Hook to wrap tool calls — useful for adding request-level metadata, custom retry policies,
    or telemetry. See [`ProcessToolCallback`][pydantic_ai.mcp.ProcessToolCallback].
    """

    sampling_model: models.Model | None
    """A Pydantic AI model that the server may sample from via the MCP `sampling/createMessage` flow.

    When set (and no explicit `sampling_handler` is passed), Pydantic AI builds a sampling handler
    that delegates to this model with the request's `maxTokens`/`temperature`/`stopSequences`
    settings applied. If both `sampling_model` and `sampling_handler` are passed, an error is raised.
    """

    log_level: mcp_types.LoggingLevel | None
    """Log level requested from the server via `logging/setLevel` after initialization.

    `None` (default) leaves the server's default log level alone. Combine with `log_handler` to
    receive log messages.
    """

    _id: str | None
    _server_info: mcp_types.Implementation | None
    _server_capabilities: ServerCapabilities | None
    _instructions: str | None
    _cached_tools: list[mcp_types.Tool] | None
    _cached_resources: list[Resource] | None
    _running_count: int
    _exit_stack: AsyncExitStack | None
    _user_message_handler: MessageHandlerT | None

    @functools.cached_property
    def _enter_lock(self) -> anyio.Lock:
        # `anyio.Lock` binds to the event loop on which it's first used; deferring creation to first
        # access ensures it binds to the running loop and avoids issues with Temporal's workflow sandbox.
        return anyio.Lock()

    def __init__(
        self,
        client: MCPToolsetClient,
        *,
        # Pydantic AI-layer config
        id: str | None = None,
        max_retries: int | None = None,
        tool_error_behavior: Literal['retry', 'error'] = 'retry',
        process_tool_call: ProcessToolCallback | None = None,
        cache_tools: bool = True,
        cache_resources: bool = True,
        include_instructions: bool = False,
        include_return_schema: bool | None = None,
        # Sampling — high-level shortcut and low-level escape hatch
        sampling_model: models.Model | None = None,
        sampling_handler: SamplingHandler[Any, Any] | None = None,
        # MCP protocol kwargs (forwarded to a default FastMCP Client when one isn't passed)
        elicitation_handler: ElicitationHandler[Any, Any] | None = None,
        log_handler: LogHandler | None = None,
        log_level: mcp_types.LoggingLevel | None = None,
        progress_handler: ProgressHandler | None = None,
        message_handler: MessageHandlerT | None = None,
        client_info: mcp_types.Implementation | None = None,
        init_timeout: float | None = _UNSET,
        read_timeout: float | None = _UNSET,
        roots: RootsList | RootsHandler[Any] | None = None,
        # HTTP-specific (only used when constructing a default transport from a URL)
        auth: httpx.Auth | Literal['oauth'] | str | None = None,
        verify: ssl.SSLContext | bool | str | None = None,
        headers: dict[str, str] | None = None,
        http_client: httpx.AsyncClient | None = None,
    ):
        """Build a new `MCPToolset`.

        Args:
            client: How to connect to the MCP server. See the class docstring for accepted shapes.
            id: An optional unique identifier for this toolset. Required for use in durable execution
                environments like Temporal or DBOS, where it identifies the toolset's activities/steps
                within a workflow.
            max_retries: Maximum number of times a tool call may be retried after a `ModelRetry`.
                `None` inherits the agent's retry count at runtime.
            tool_error_behavior: `'retry'` (default) raises
                [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] on tool errors so the model can
                self-correct; `'error'` propagates the underlying exception.
            process_tool_call: Hook to wrap tool calls. See
                [`ProcessToolCallback`][pydantic_ai.mcp.ProcessToolCallback].
            cache_tools: Whether to cache the list of tools. See
                [`MCPToolset.cache_tools`][pydantic_ai.mcp.MCPToolset.cache_tools].
            cache_resources: Whether to cache the list of resources. See
                [`MCPToolset.cache_resources`][pydantic_ai.mcp.MCPToolset.cache_resources].
            include_instructions: Whether to include the server's instructions in the agent's
                instructions. See
                [`MCPToolset.include_instructions`][pydantic_ai.mcp.MCPToolset.include_instructions].
            include_return_schema: Whether to include return schemas in tool definitions. See
                [`MCPToolset.include_return_schema`][pydantic_ai.mcp.MCPToolset.include_return_schema].
            sampling_model: A Pydantic AI model the server may sample from. Mutually exclusive with
                `sampling_handler`.
            sampling_handler: A FastMCP-shaped sampling handler. Use for full control over the
                sampling response.
            elicitation_handler: A FastMCP-shaped elicitation handler that receives MCP
                `elicitation/create` requests from the server.
            log_handler: A FastMCP-shaped log handler that receives log messages from the server.
            log_level: Log level requested from the server via `logging/setLevel` after
                initialization.
            progress_handler: A FastMCP-shaped progress handler.
            message_handler: A FastMCP-shaped message handler called for every server-sent message.
                Pydantic AI installs its own message handler internally to invalidate caches on
                `list_changed` notifications; if you provide one, both run (yours after ours).
            client_info: Information describing the MCP client implementation, sent to the server
                during initialization.
            init_timeout: Timeout in seconds for the initial connection and `initialize` handshake.
            read_timeout: Maximum time in seconds to wait for new messages on the long-lived
                connection. Defaults to 5 minutes.
            roots: Filesystem roots advertised to the server.
            auth: HTTP authentication for HTTP transports — an `httpx.Auth`, the literal string
                `'oauth'` to enable FastMCP's OAuth flow, or a bearer-token string.
            verify: SSL verification mode for HTTP transports — an `ssl.SSLContext`, a CA bundle
                path string, or a bool.
            headers: Extra HTTP headers for HTTP transports. Mutually exclusive with `http_client`.
            http_client: A pre-configured `httpx.AsyncClient` to use for HTTP transports — useful
                for self-signed certificates or custom connection pooling. Mutually exclusive with
                `headers`.

        Raises:
            ValueError: If a pre-built `fastmcp.Client` is passed alongside any of the kwargs that
                would otherwise build a default Client (sampling, elicitation, headers, etc.), or
                if `sampling_model` and `sampling_handler` are both passed, or if `headers` and
                `http_client` are both passed.
            ImportError: If the fastmcp client isn't installed. Install the `mcp` extra (which pulls
                `fastmcp-slim[client]`): `pip install "pydantic-ai-slim[mcp]"`.
        """
        _require_fastmcp()
        if isinstance(client, FastMCPClient):
            forwarded_values: dict[str, Any] = {
                'sampling_handler': sampling_handler,
                'sampling_model': sampling_model,
                'elicitation_handler': elicitation_handler,
                'log_handler': log_handler,
                'progress_handler': progress_handler,
                'message_handler': message_handler,
                'client_info': client_info,
                'roots': roots,
                'auth': auth,
                'verify': verify,
                'headers': headers,
                'http_client': http_client,
            }
            conflicts = [name for name, value in forwarded_values.items() if value is not None]
            # `init_timeout`/`read_timeout` use `_UNSET` as their default so we can detect "passed
            # explicitly" vs "default" without coupling to the literal default values.
            if init_timeout is not _UNSET:
                conflicts.append('init_timeout')
            if read_timeout is not _UNSET:
                conflicts.append('read_timeout')
            if conflicts:
                names = ', '.join(repr(n) for n in conflicts)
                raise ValueError(
                    f'Cannot pass {names} alongside a pre-built `fastmcp.Client` — '
                    'configure these on the Client itself instead.'
                )
            self.client = client
            self._user_message_handler = None
        else:
            if sampling_handler is not None and sampling_model is not None:
                raise ValueError('Pass either `sampling_model` or `sampling_handler`, not both.')
            if headers is not None and http_client is not None:
                raise ValueError(
                    '`headers` and `http_client` are mutually exclusive — set headers on the `http_client` instead.'
                )

            # Resolve sentinels to actual defaults now that the conflict check has run.
            if init_timeout is _UNSET:
                init_timeout = 5
            if read_timeout is _UNSET:
                read_timeout = 5 * 60

            transport = _build_transport(
                client,
                headers=headers,
                http_client=http_client,
                auth=auth,
                verify=verify,
                read_timeout=read_timeout,
            )
            resolved_sampling_handler = sampling_handler
            if resolved_sampling_handler is None and sampling_model is not None:
                resolved_sampling_handler = _build_sampling_handler(sampling_model)

            wrapped_message_handler = _build_message_handler(self, message_handler)

            self.client = FastMCPClient[Any](
                transport=transport,
                sampling_handler=resolved_sampling_handler,
                elicitation_handler=elicitation_handler,
                log_handler=log_handler,
                progress_handler=progress_handler,
                message_handler=wrapped_message_handler,
                client_info=client_info,
                init_timeout=init_timeout,
                timeout=read_timeout,
                roots=roots,
            )
            self._user_message_handler = message_handler

        self._id = id
        self.max_retries = max_retries
        self.tool_error_behavior = tool_error_behavior
        self.process_tool_call = process_tool_call
        self.cache_tools = cache_tools
        self.cache_resources = cache_resources
        self.include_instructions = include_instructions
        self.include_return_schema = include_return_schema
        self.sampling_model = sampling_model
        self.log_level = log_level

        self._server_info = None
        self._server_capabilities = None
        self._instructions = None
        self._cached_tools = None
        self._cached_resources = None
        self._running_count = 0
        self._exit_stack = None

    @property
    def id(self) -> str | None:
        return self._id

    @id.setter
    def id(self, value: str | None) -> None:
        self._id = value

    @property
    def label(self) -> str:
        if self.id:
            return super().label  # pragma: no cover
        return repr(self)

    @property
    def tool_name_conflict_hint(self) -> str:
        return 'Wrap the toolset with `.prefixed("...")` to disambiguate tool names from multiple MCP servers.'

    @property
    def server_info(self) -> mcp_types.Implementation:
        """The server-implementation info sent during initialization.

        Raises [`AttributeError`][AttributeError] when accessed before the toolset has been entered.
        """
        if self._server_info is None:
            raise AttributeError(f'`{self.__class__.__name__}.server_info` is only available after initialization.')
        return self._server_info

    @property
    def capabilities(self) -> ServerCapabilities:
        """The capabilities advertised by the server during initialization.

        Raises [`AttributeError`][AttributeError] when accessed before the toolset has been entered.
        """
        if self._server_capabilities is None:
            raise AttributeError(f'`{self.__class__.__name__}.capabilities` is only available after initialization.')
        return self._server_capabilities

    @property
    def instructions(self) -> str | None:
        """The instructions sent by the server during initialization.

        Raises [`AttributeError`][AttributeError] when accessed before the toolset has been entered.
        """
        if not self._initialized:
            raise AttributeError(f'`{self.__class__.__name__}.instructions` is only available after initialization.')
        return self._instructions

    @property
    def is_running(self) -> bool:
        """Whether the toolset is currently entered (the FastMCP session is open)."""
        return self._running_count > 0

    @property
    def _initialized(self) -> bool:
        return self._server_info is not None

    def _invalidate_tools_cache(self) -> None:
        self._cached_tools = None

    def _invalidate_resources_cache(self) -> None:
        self._cached_resources = None

    async def __aenter__(self) -> Self:
        async with self._enter_lock:
            if self._running_count == 0:
                # Build the exit stack inside an `async with` so any failure after
                # `enter_async_context(self.client)` cleans up the open session — only commit the
                # stack and write `_server_info`/`_server_capabilities`/`_instructions` to `self`
                # once initialization fully succeeds, so `_initialized` can't see stale data from a
                # session that got torn down mid-setup.
                async with AsyncExitStack() as exit_stack:
                    await exit_stack.enter_async_context(self.client)
                    init_result = self.client.initialize_result
                    assert init_result is not None, 'FastMCP Client initialization returned no result'
                    server_info = init_result.serverInfo
                    server_capabilities = ServerCapabilities.from_mcp_sdk(init_result.capabilities)
                    instructions = init_result.instructions
                    if self.log_level is not None:
                        await self.client.session.set_logging_level(self.log_level)
                    self._exit_stack = exit_stack.pop_all()
                    self._server_info = server_info
                    self._server_capabilities = server_capabilities
                    self._instructions = instructions
            self._running_count += 1
        return self

    async def __aexit__(self, *args: Any) -> bool | None:
        async with self._enter_lock:
            if self._running_count == 0:
                raise ValueError(f'`{self.__class__.__name__}.__aexit__` called more times than `__aenter__`')
            self._running_count -= 1
            if self._running_count == 0 and self._exit_stack is not None:
                await self._exit_stack.aclose()
                self._exit_stack = None
                self._server_info = None
                self._server_capabilities = None
                self._instructions = None
                self._cached_tools = None
                self._cached_resources = None
        return None

    async def get_instructions(self, ctx: RunContext[AgentDepsT]) -> messages.InstructionPart | None:
        """Return the server's instructions if `include_instructions` is enabled."""
        if not self.include_instructions:
            return None
        if not self._initialized or self._instructions is None:
            return None
        # Instructions are captured once during `__aenter__` and don't change across runs while
        # the toolset stays entered — so they're static from the agent's perspective, not dynamic.
        return messages.InstructionPart(content=self._instructions, dynamic=False)

    async def list_tools(self) -> list[mcp_types.Tool]:
        """Retrieve the tools currently exposed by the server.

        When [`cache_tools`][pydantic_ai.mcp.MCPToolset.cache_tools] is enabled (default), results
        are cached and invalidated by `notifications/tools/list_changed` or the toolset's last
        `__aexit__`.
        """
        if self.cache_tools and self._cached_tools is not None:
            return self._cached_tools
        async with self:
            tools = await self.client.list_tools()
            if self.cache_tools:
                self._cached_tools = tools
            return tools

    async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[AgentDepsT]]:
        max_retries = self.max_retries if self.max_retries is not None else ctx.max_retries
        tools: dict[str, ToolsetTool[AgentDepsT]] = {}
        for mcp_tool in await self.list_tools():
            task_support = mcp_tool.execution.taskSupport if mcp_tool.execution else None
            tools[mcp_tool.name] = ToolsetTool[AgentDepsT](
                toolset=self,
                tool_def=ToolDefinition(
                    name=mcp_tool.name,
                    description=mcp_tool.description,
                    parameters_json_schema=mcp_tool.inputSchema,
                    metadata={
                        'meta': mcp_tool.meta,
                        'annotations': mcp_tool.annotations.model_dump() if mcp_tool.annotations else None,
                        'task': task_support in ('required', 'optional'),
                    },
                    return_schema=mcp_tool.outputSchema or None,
                    include_return_schema=self.include_return_schema,
                ),
                max_retries=max_retries,
                args_validator=TOOL_SCHEMA_VALIDATOR,
            )
        return tools

    def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[AgentDepsT]:
        return ToolsetTool[AgentDepsT](
            toolset=self,
            tool_def=tool_def,
            max_retries=self.max_retries if self.max_retries is not None else 1,
            args_validator=TOOL_SCHEMA_VALIDATOR,
        )

    async def direct_call_tool(
        self,
        name: str,
        args: dict[str, Any],
        *,
        metadata: dict[str, Any] | None = None,
        use_task: bool = False,
    ) -> Any:
        """Call a tool on the server directly.

        Args:
            name: The name of the tool to call.
            args: The arguments to pass to the tool.
            metadata: Optional request-level `_meta` payload sent alongside the call.
            use_task: When `True`, send the call with `task=True` per MCP
                [SEP-1686](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks) so
                the server wraps execution in a durable, cancelable, pollable task; the result is awaited via
                `tasks/result`. Only valid for tools whose `execution.taskSupport` is `'required'` or `'optional'`.

        Raises:
            ModelRetry: If the tool errors and `tool_error_behavior='retry'` (the default).
            fastmcp.exceptions.ToolError: If the tool errors and `tool_error_behavior='error'`.
        """
        async with self:
            try:
                if use_task:
                    tool_task: ToolTask = await self.client.call_tool(
                        name=name, arguments=args, task=True, meta=metadata
                    )
                    result: CallToolResult = await tool_task.result()
                else:
                    result = await self.client.call_tool(name=name, arguments=args, meta=metadata)
            except ToolError as e:
                if self.tool_error_behavior == 'retry':
                    raise exceptions.ModelRetry(message=str(e)) from e
                raise

        # Prefer structured content if all parts are text (per the docs they contain the JSON-encoded
        # structured content for backward compatibility).
        # See https://github.com/modelcontextprotocol/python-sdk#structured-output
        if (structured := result.structured_content) and all(
            isinstance(part, mcp_types.TextContent) for part in result.content
        ):
            # The MCP SDK wraps primitives and generic types like list in a `result` key, but we want
            # the raw value returned by the tool function.
            if isinstance(structured, dict) and len(structured) == 1 and 'result' in structured:
                return structured['result']
            return structured

        return _map_mcp_tool_results(result.content)

    async def call_tool(
        self,
        name: str,
        tool_args: dict[str, Any],
        ctx: RunContext[Any],
        tool: ToolsetTool[Any],
    ) -> Any:
        # Server-side task-augmented execution per MCP SEP-1686 is governed entirely by the tool's
        # `execution.taskSupport`: 'required'/'optional' → task path; 'forbidden' or absent → regular path.
        use_task = bool((tool.tool_def.metadata or {}).get('task'))
        if self.process_tool_call is not None:
            return await self.process_tool_call(
                ctx, functools.partial(self.direct_call_tool, use_task=use_task), name, tool_args
            )
        return await self.direct_call_tool(name, tool_args, use_task=use_task)

    async def list_resources(self) -> list[Resource]:
        """Retrieve the resources currently exposed by the server.

        When [`cache_resources`][pydantic_ai.mcp.MCPToolset.cache_resources] is enabled (default),
        results are cached and invalidated by `notifications/resources/list_changed` or the
        toolset's last `__aexit__`.

        Returns an empty list if the server does not advertise the `resources` capability.

        Raises:
            MCPError: If the server returns an error.
        """
        if self.cache_resources and self._cached_resources is not None:
            return self._cached_resources
        async with self:
            if not self.capabilities.resources:
                return []
            try:
                mcp_resources = await self.client.list_resources()
            except mcp_exceptions.McpError as e:
                raise MCPError.from_mcp_sdk(e) from e
            resources = [Resource.from_mcp_sdk(r) for r in mcp_resources]
            if self.cache_resources:
                self._cached_resources = resources
            return resources

    async def list_resource_templates(self) -> list[ResourceTemplate]:
        """Retrieve the resource templates currently exposed by the server.

        Returns an empty list if the server does not advertise the `resources` capability.

        Raises:
            MCPError: If the server returns an error.
        """
        async with self:
            if not self.capabilities.resources:
                return []
            try:
                mcp_templates = await self.client.list_resource_templates()
            except mcp_exceptions.McpError as e:
                raise MCPError.from_mcp_sdk(e) from e
        return [ResourceTemplate.from_mcp_sdk(t) for t in mcp_templates]

    @overload
    async def read_resource(self, uri: str) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ...

    @overload
    async def read_resource(
        self, uri: Resource
    ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ...

    async def read_resource(
        self, uri: str | Resource
    ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]:
        """Read the contents of a specific resource by URI.

        Args:
            uri: The URI of the resource to read, or a [`Resource`][pydantic_ai.mcp.Resource] object.

        Returns:
            The resource contents — a single value if the resource has one content item, or a list
            otherwise. Text content is returned as `str`, binary content as
            [`BinaryContent`][pydantic_ai.messages.BinaryContent].

        Raises:
            MCPError: If the server returns an error.
        """
        resource_uri = uri if isinstance(uri, str) else uri.uri
        async with self:
            try:
                contents = await self.client.read_resource(AnyUrl(resource_uri))
            except mcp_exceptions.McpError as e:
                raise MCPError.from_mcp_sdk(e) from e

        return (
            _resource_content_to_pai(contents[0])
            if len(contents) == 1
            else [_resource_content_to_pai(c) for c in contents]
        )

    def __repr__(self) -> str:
        repr_args = [f'client={self.client!r}']
        if self._id is not None:
            repr_args.append(f'id={self._id!r}')
        return f'{self.__class__.__name__}({", ".join(repr_args)})'

    def __eq__(self, value: object, /) -> bool:
        return isinstance(value, MCPToolset) and self._id == value._id and self.client is value.client

    def __hash__(self) -> int:
        return hash((self._id, id(self.client)))

client instance-attribute

client: Client[Any]

The underlying FastMCP Client. Always normalized to a fastmcp.Client regardless of how the toolset was constructed.

__init__

__init__(
    client: MCPToolsetClient,
    *,
    id: str | None = None,
    max_retries: int | None = None,
    tool_error_behavior: Literal[
        "retry", "error"
    ] = "retry",
    process_tool_call: ProcessToolCallback | None = None,
    cache_tools: bool = True,
    cache_resources: bool = True,
    include_instructions: bool = False,
    include_return_schema: bool | None = None,
    sampling_model: Model | None = None,
    sampling_handler: (
        SamplingHandler[Any, Any] | None
    ) = None,
    elicitation_handler: (
        ElicitationHandler[Any, Any] | None
    ) = None,
    log_handler: LogHandler | None = None,
    log_level: LoggingLevel | None = None,
    progress_handler: ProgressHandler | None = None,
    message_handler: MessageHandlerT | None = None,
    client_info: Implementation | None = None,
    init_timeout: float | None = _UNSET,
    read_timeout: float | None = _UNSET,
    roots: RootsList | RootsHandler[Any] | None = None,
    auth: Auth | Literal["oauth"] | str | None = None,
    verify: SSLContext | bool | str | None = None,
    headers: dict[str, str] | None = None,
    http_client: AsyncClient | None = None
)

Build a new MCPToolset.

Parameters:

Name Type Description Default
client MCPToolsetClient

How to connect to the MCP server. See the class docstring for accepted shapes.

required
id str | None

An optional unique identifier for this toolset. Required for use in durable execution environments like Temporal or DBOS, where it identifies the toolset's activities/steps within a workflow.

None
max_retries int | None

Maximum number of times a tool call may be retried after a ModelRetry. None inherits the agent's retry count at runtime.

None
tool_error_behavior Literal['retry', 'error']

'retry' (default) raises ModelRetry on tool errors so the model can self-correct; 'error' propagates the underlying exception.

'retry'
process_tool_call ProcessToolCallback | None

Hook to wrap tool calls. See ProcessToolCallback.

None
cache_tools bool

Whether to cache the list of tools. See MCPToolset.cache_tools.

True
cache_resources bool

Whether to cache the list of resources. See MCPToolset.cache_resources.

True
include_instructions bool

Whether to include the server's instructions in the agent's instructions. See MCPToolset.include_instructions.

False
include_return_schema bool | None

Whether to include return schemas in tool definitions. See MCPToolset.include_return_schema.

None
sampling_model Model | None

A Pydantic AI model the server may sample from. Mutually exclusive with sampling_handler.

None
sampling_handler SamplingHandler[Any, Any] | None

A FastMCP-shaped sampling handler. Use for full control over the sampling response.

None
elicitation_handler ElicitationHandler[Any, Any] | None

A FastMCP-shaped elicitation handler that receives MCP elicitation/create requests from the server.

None
log_handler LogHandler | None

A FastMCP-shaped log handler that receives log messages from the server.

None
log_level LoggingLevel | None

Log level requested from the server via logging/setLevel after initialization.

None
progress_handler ProgressHandler | None

A FastMCP-shaped progress handler.

None
message_handler MessageHandlerT | None

A FastMCP-shaped message handler called for every server-sent message. Pydantic AI installs its own message handler internally to invalidate caches on list_changed notifications; if you provide one, both run (yours after ours).

None
client_info Implementation | None

Information describing the MCP client implementation, sent to the server during initialization.

None
init_timeout float | None

Timeout in seconds for the initial connection and initialize handshake.

_UNSET
read_timeout float | None

Maximum time in seconds to wait for new messages on the long-lived connection. Defaults to 5 minutes.

_UNSET
roots RootsList | RootsHandler[Any] | None

Filesystem roots advertised to the server.

None
auth Auth | Literal['oauth'] | str | None

HTTP authentication for HTTP transports — an httpx.Auth, the literal string 'oauth' to enable FastMCP's OAuth flow, or a bearer-token string.

None
verify SSLContext | bool | str | None

SSL verification mode for HTTP transports — an ssl.SSLContext, a CA bundle path string, or a bool.

None
headers dict[str, str] | None

Extra HTTP headers for HTTP transports. Mutually exclusive with http_client.

None
http_client AsyncClient | None

A pre-configured httpx.AsyncClient to use for HTTP transports — useful for self-signed certificates or custom connection pooling. Mutually exclusive with headers.

None

Raises:

Type Description
ValueError

If a pre-built fastmcp.Client is passed alongside any of the kwargs that would otherwise build a default Client (sampling, elicitation, headers, etc.), or if sampling_model and sampling_handler are both passed, or if headers and http_client are both passed.

ImportError

If the fastmcp client isn't installed. Install the mcp extra (which pulls fastmcp-slim[client]): pip install "pydantic-ai-slim[mcp]".

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
def __init__(
    self,
    client: MCPToolsetClient,
    *,
    # Pydantic AI-layer config
    id: str | None = None,
    max_retries: int | None = None,
    tool_error_behavior: Literal['retry', 'error'] = 'retry',
    process_tool_call: ProcessToolCallback | None = None,
    cache_tools: bool = True,
    cache_resources: bool = True,
    include_instructions: bool = False,
    include_return_schema: bool | None = None,
    # Sampling — high-level shortcut and low-level escape hatch
    sampling_model: models.Model | None = None,
    sampling_handler: SamplingHandler[Any, Any] | None = None,
    # MCP protocol kwargs (forwarded to a default FastMCP Client when one isn't passed)
    elicitation_handler: ElicitationHandler[Any, Any] | None = None,
    log_handler: LogHandler | None = None,
    log_level: mcp_types.LoggingLevel | None = None,
    progress_handler: ProgressHandler | None = None,
    message_handler: MessageHandlerT | None = None,
    client_info: mcp_types.Implementation | None = None,
    init_timeout: float | None = _UNSET,
    read_timeout: float | None = _UNSET,
    roots: RootsList | RootsHandler[Any] | None = None,
    # HTTP-specific (only used when constructing a default transport from a URL)
    auth: httpx.Auth | Literal['oauth'] | str | None = None,
    verify: ssl.SSLContext | bool | str | None = None,
    headers: dict[str, str] | None = None,
    http_client: httpx.AsyncClient | None = None,
):
    """Build a new `MCPToolset`.

    Args:
        client: How to connect to the MCP server. See the class docstring for accepted shapes.
        id: An optional unique identifier for this toolset. Required for use in durable execution
            environments like Temporal or DBOS, where it identifies the toolset's activities/steps
            within a workflow.
        max_retries: Maximum number of times a tool call may be retried after a `ModelRetry`.
            `None` inherits the agent's retry count at runtime.
        tool_error_behavior: `'retry'` (default) raises
            [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] on tool errors so the model can
            self-correct; `'error'` propagates the underlying exception.
        process_tool_call: Hook to wrap tool calls. See
            [`ProcessToolCallback`][pydantic_ai.mcp.ProcessToolCallback].
        cache_tools: Whether to cache the list of tools. See
            [`MCPToolset.cache_tools`][pydantic_ai.mcp.MCPToolset.cache_tools].
        cache_resources: Whether to cache the list of resources. See
            [`MCPToolset.cache_resources`][pydantic_ai.mcp.MCPToolset.cache_resources].
        include_instructions: Whether to include the server's instructions in the agent's
            instructions. See
            [`MCPToolset.include_instructions`][pydantic_ai.mcp.MCPToolset.include_instructions].
        include_return_schema: Whether to include return schemas in tool definitions. See
            [`MCPToolset.include_return_schema`][pydantic_ai.mcp.MCPToolset.include_return_schema].
        sampling_model: A Pydantic AI model the server may sample from. Mutually exclusive with
            `sampling_handler`.
        sampling_handler: A FastMCP-shaped sampling handler. Use for full control over the
            sampling response.
        elicitation_handler: A FastMCP-shaped elicitation handler that receives MCP
            `elicitation/create` requests from the server.
        log_handler: A FastMCP-shaped log handler that receives log messages from the server.
        log_level: Log level requested from the server via `logging/setLevel` after
            initialization.
        progress_handler: A FastMCP-shaped progress handler.
        message_handler: A FastMCP-shaped message handler called for every server-sent message.
            Pydantic AI installs its own message handler internally to invalidate caches on
            `list_changed` notifications; if you provide one, both run (yours after ours).
        client_info: Information describing the MCP client implementation, sent to the server
            during initialization.
        init_timeout: Timeout in seconds for the initial connection and `initialize` handshake.
        read_timeout: Maximum time in seconds to wait for new messages on the long-lived
            connection. Defaults to 5 minutes.
        roots: Filesystem roots advertised to the server.
        auth: HTTP authentication for HTTP transports — an `httpx.Auth`, the literal string
            `'oauth'` to enable FastMCP's OAuth flow, or a bearer-token string.
        verify: SSL verification mode for HTTP transports — an `ssl.SSLContext`, a CA bundle
            path string, or a bool.
        headers: Extra HTTP headers for HTTP transports. Mutually exclusive with `http_client`.
        http_client: A pre-configured `httpx.AsyncClient` to use for HTTP transports — useful
            for self-signed certificates or custom connection pooling. Mutually exclusive with
            `headers`.

    Raises:
        ValueError: If a pre-built `fastmcp.Client` is passed alongside any of the kwargs that
            would otherwise build a default Client (sampling, elicitation, headers, etc.), or
            if `sampling_model` and `sampling_handler` are both passed, or if `headers` and
            `http_client` are both passed.
        ImportError: If the fastmcp client isn't installed. Install the `mcp` extra (which pulls
            `fastmcp-slim[client]`): `pip install "pydantic-ai-slim[mcp]"`.
    """
    _require_fastmcp()
    if isinstance(client, FastMCPClient):
        forwarded_values: dict[str, Any] = {
            'sampling_handler': sampling_handler,
            'sampling_model': sampling_model,
            'elicitation_handler': elicitation_handler,
            'log_handler': log_handler,
            'progress_handler': progress_handler,
            'message_handler': message_handler,
            'client_info': client_info,
            'roots': roots,
            'auth': auth,
            'verify': verify,
            'headers': headers,
            'http_client': http_client,
        }
        conflicts = [name for name, value in forwarded_values.items() if value is not None]
        # `init_timeout`/`read_timeout` use `_UNSET` as their default so we can detect "passed
        # explicitly" vs "default" without coupling to the literal default values.
        if init_timeout is not _UNSET:
            conflicts.append('init_timeout')
        if read_timeout is not _UNSET:
            conflicts.append('read_timeout')
        if conflicts:
            names = ', '.join(repr(n) for n in conflicts)
            raise ValueError(
                f'Cannot pass {names} alongside a pre-built `fastmcp.Client` — '
                'configure these on the Client itself instead.'
            )
        self.client = client
        self._user_message_handler = None
    else:
        if sampling_handler is not None and sampling_model is not None:
            raise ValueError('Pass either `sampling_model` or `sampling_handler`, not both.')
        if headers is not None and http_client is not None:
            raise ValueError(
                '`headers` and `http_client` are mutually exclusive — set headers on the `http_client` instead.'
            )

        # Resolve sentinels to actual defaults now that the conflict check has run.
        if init_timeout is _UNSET:
            init_timeout = 5
        if read_timeout is _UNSET:
            read_timeout = 5 * 60

        transport = _build_transport(
            client,
            headers=headers,
            http_client=http_client,
            auth=auth,
            verify=verify,
            read_timeout=read_timeout,
        )
        resolved_sampling_handler = sampling_handler
        if resolved_sampling_handler is None and sampling_model is not None:
            resolved_sampling_handler = _build_sampling_handler(sampling_model)

        wrapped_message_handler = _build_message_handler(self, message_handler)

        self.client = FastMCPClient[Any](
            transport=transport,
            sampling_handler=resolved_sampling_handler,
            elicitation_handler=elicitation_handler,
            log_handler=log_handler,
            progress_handler=progress_handler,
            message_handler=wrapped_message_handler,
            client_info=client_info,
            init_timeout=init_timeout,
            timeout=read_timeout,
            roots=roots,
        )
        self._user_message_handler = message_handler

    self._id = id
    self.max_retries = max_retries
    self.tool_error_behavior = tool_error_behavior
    self.process_tool_call = process_tool_call
    self.cache_tools = cache_tools
    self.cache_resources = cache_resources
    self.include_instructions = include_instructions
    self.include_return_schema = include_return_schema
    self.sampling_model = sampling_model
    self.log_level = log_level

    self._server_info = None
    self._server_capabilities = None
    self._instructions = None
    self._cached_tools = None
    self._cached_resources = None
    self._running_count = 0
    self._exit_stack = None

max_retries instance-attribute

max_retries: int | None = max_retries

Maximum number of times a tool call may be retried after a ModelRetry.

None (default) inherits the agent's retry count at runtime. Set explicitly to override.

tool_error_behavior instance-attribute

tool_error_behavior: Literal["retry", "error"] = (
    tool_error_behavior
)

How to handle tool errors raised by the server.

'retry' (default) raises ModelRetry so the model can self-correct; 'error' propagates the underlying fastmcp.exceptions.ToolError to the caller.

process_tool_call instance-attribute

process_tool_call: ProcessToolCallback | None = (
    process_tool_call
)

Hook to wrap tool calls — useful for adding request-level metadata, custom retry policies, or telemetry. See ProcessToolCallback.

cache_tools instance-attribute

cache_tools: bool = cache_tools

Whether to cache the list of tools across get_tools() calls.

When enabled (default), tools are fetched once and cached until either:

  • The server sends a notifications/tools/list_changed notification
  • The toolset is fully exited (last __aexit__ matches the first __aenter__)

Set to False for servers that change tools dynamically without sending notifications, or when passing a pre-built FastMCP Client (the cache-invalidation message handler isn't installed in that case, so caches are only invalidated by session close).

cache_resources instance-attribute

cache_resources: bool = cache_resources

Whether to cache the list of resources across list_resources() calls.

Same semantics as cache_tools but for notifications/resources/list_changed notifications.

include_instructions instance-attribute

include_instructions: bool = include_instructions

Whether to include the server's initialize instructions string in the agent's instruction set.

Defaults to False for backward compatibility. When True, the instructions returned by the server during initialization are added to the agent's instructions.

include_return_schema instance-attribute

include_return_schema: bool | None = include_return_schema

Whether to include each tool's outputSchema in the schema sent to the model.

When None (the default), defaults to False unless the IncludeToolReturnSchemas capability is used.

sampling_model instance-attribute

sampling_model: Model | None = sampling_model

A Pydantic AI model that the server may sample from via the MCP sampling/createMessage flow.

When set (and no explicit sampling_handler is passed), Pydantic AI builds a sampling handler that delegates to this model with the request's maxTokens/temperature/stopSequences settings applied. If both sampling_model and sampling_handler are passed, an error is raised.

log_level instance-attribute

log_level: LoggingLevel | None = log_level

Log level requested from the server via logging/setLevel after initialization.

None (default) leaves the server's default log level alone. Combine with log_handler to receive log messages.

server_info property

server_info: Implementation

The server-implementation info sent during initialization.

Raises AttributeError when accessed before the toolset has been entered.

capabilities property

capabilities: ServerCapabilities

The capabilities advertised by the server during initialization.

Raises AttributeError when accessed before the toolset has been entered.

instructions property

instructions: str | None

The instructions sent by the server during initialization.

Raises AttributeError when accessed before the toolset has been entered.

is_running property

is_running: bool

Whether the toolset is currently entered (the FastMCP session is open).

get_instructions async

get_instructions(
    ctx: RunContext[AgentDepsT],
) -> InstructionPart | None

Return the server's instructions if include_instructions is enabled.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2060
2061
2062
2063
2064
2065
2066
2067
2068
async def get_instructions(self, ctx: RunContext[AgentDepsT]) -> messages.InstructionPart | None:
    """Return the server's instructions if `include_instructions` is enabled."""
    if not self.include_instructions:
        return None
    if not self._initialized or self._instructions is None:
        return None
    # Instructions are captured once during `__aenter__` and don't change across runs while
    # the toolset stays entered — so they're static from the agent's perspective, not dynamic.
    return messages.InstructionPart(content=self._instructions, dynamic=False)

list_tools async

list_tools() -> list[Tool]

Retrieve the tools currently exposed by the server.

When cache_tools is enabled (default), results are cached and invalidated by notifications/tools/list_changed or the toolset's last __aexit__.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
async def list_tools(self) -> list[mcp_types.Tool]:
    """Retrieve the tools currently exposed by the server.

    When [`cache_tools`][pydantic_ai.mcp.MCPToolset.cache_tools] is enabled (default), results
    are cached and invalidated by `notifications/tools/list_changed` or the toolset's last
    `__aexit__`.
    """
    if self.cache_tools and self._cached_tools is not None:
        return self._cached_tools
    async with self:
        tools = await self.client.list_tools()
        if self.cache_tools:
            self._cached_tools = tools
        return tools

direct_call_tool async

direct_call_tool(
    name: str,
    args: dict[str, Any],
    *,
    metadata: dict[str, Any] | None = None,
    use_task: bool = False
) -> Any

Call a tool on the server directly.

Parameters:

Name Type Description Default
name str

The name of the tool to call.

required
args dict[str, Any]

The arguments to pass to the tool.

required
metadata dict[str, Any] | None

Optional request-level _meta payload sent alongside the call.

None
use_task bool

When True, send the call with task=True per MCP SEP-1686 so the server wraps execution in a durable, cancelable, pollable task; the result is awaited via tasks/result. Only valid for tools whose execution.taskSupport is 'required' or 'optional'.

False

Raises:

Type Description
ModelRetry

If the tool errors and tool_error_behavior='retry' (the default).

ToolError

If the tool errors and tool_error_behavior='error'.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
async def direct_call_tool(
    self,
    name: str,
    args: dict[str, Any],
    *,
    metadata: dict[str, Any] | None = None,
    use_task: bool = False,
) -> Any:
    """Call a tool on the server directly.

    Args:
        name: The name of the tool to call.
        args: The arguments to pass to the tool.
        metadata: Optional request-level `_meta` payload sent alongside the call.
        use_task: When `True`, send the call with `task=True` per MCP
            [SEP-1686](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks) so
            the server wraps execution in a durable, cancelable, pollable task; the result is awaited via
            `tasks/result`. Only valid for tools whose `execution.taskSupport` is `'required'` or `'optional'`.

    Raises:
        ModelRetry: If the tool errors and `tool_error_behavior='retry'` (the default).
        fastmcp.exceptions.ToolError: If the tool errors and `tool_error_behavior='error'`.
    """
    async with self:
        try:
            if use_task:
                tool_task: ToolTask = await self.client.call_tool(
                    name=name, arguments=args, task=True, meta=metadata
                )
                result: CallToolResult = await tool_task.result()
            else:
                result = await self.client.call_tool(name=name, arguments=args, meta=metadata)
        except ToolError as e:
            if self.tool_error_behavior == 'retry':
                raise exceptions.ModelRetry(message=str(e)) from e
            raise

    # Prefer structured content if all parts are text (per the docs they contain the JSON-encoded
    # structured content for backward compatibility).
    # See https://github.com/modelcontextprotocol/python-sdk#structured-output
    if (structured := result.structured_content) and all(
        isinstance(part, mcp_types.TextContent) for part in result.content
    ):
        # The MCP SDK wraps primitives and generic types like list in a `result` key, but we want
        # the raw value returned by the tool function.
        if isinstance(structured, dict) and len(structured) == 1 and 'result' in structured:
            return structured['result']
        return structured

    return _map_mcp_tool_results(result.content)

list_resources async

list_resources() -> list[Resource]

Retrieve the resources currently exposed by the server.

When cache_resources is enabled (default), results are cached and invalidated by notifications/resources/list_changed or the toolset's last __aexit__.

Returns an empty list if the server does not advertise the resources capability.

Raises:

Type Description
MCPError

If the server returns an error.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
async def list_resources(self) -> list[Resource]:
    """Retrieve the resources currently exposed by the server.

    When [`cache_resources`][pydantic_ai.mcp.MCPToolset.cache_resources] is enabled (default),
    results are cached and invalidated by `notifications/resources/list_changed` or the
    toolset's last `__aexit__`.

    Returns an empty list if the server does not advertise the `resources` capability.

    Raises:
        MCPError: If the server returns an error.
    """
    if self.cache_resources and self._cached_resources is not None:
        return self._cached_resources
    async with self:
        if not self.capabilities.resources:
            return []
        try:
            mcp_resources = await self.client.list_resources()
        except mcp_exceptions.McpError as e:
            raise MCPError.from_mcp_sdk(e) from e
        resources = [Resource.from_mcp_sdk(r) for r in mcp_resources]
        if self.cache_resources:
            self._cached_resources = resources
        return resources

list_resource_templates async

list_resource_templates() -> list[ResourceTemplate]

Retrieve the resource templates currently exposed by the server.

Returns an empty list if the server does not advertise the resources capability.

Raises:

Type Description
MCPError

If the server returns an error.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
async def list_resource_templates(self) -> list[ResourceTemplate]:
    """Retrieve the resource templates currently exposed by the server.

    Returns an empty list if the server does not advertise the `resources` capability.

    Raises:
        MCPError: If the server returns an error.
    """
    async with self:
        if not self.capabilities.resources:
            return []
        try:
            mcp_templates = await self.client.list_resource_templates()
        except mcp_exceptions.McpError as e:
            raise MCPError.from_mcp_sdk(e) from e
    return [ResourceTemplate.from_mcp_sdk(t) for t in mcp_templates]

read_resource async

read_resource(
    uri: str,
) -> str | BinaryContent | list[str | BinaryContent]
read_resource(
    uri: Resource,
) -> str | BinaryContent | list[str | BinaryContent]
read_resource(
    uri: str | Resource,
) -> str | BinaryContent | list[str | BinaryContent]

Read the contents of a specific resource by URI.

Parameters:

Name Type Description Default
uri str | Resource

The URI of the resource to read, or a Resource object.

required

Returns:

Type Description
str | BinaryContent | list[str | BinaryContent]

The resource contents — a single value if the resource has one content item, or a list

str | BinaryContent | list[str | BinaryContent]

otherwise. Text content is returned as str, binary content as

str | BinaryContent | list[str | BinaryContent]

Raises:

Type Description
MCPError

If the server returns an error.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
async def read_resource(
    self, uri: str | Resource
) -> str | messages.BinaryContent | list[str | messages.BinaryContent]:
    """Read the contents of a specific resource by URI.

    Args:
        uri: The URI of the resource to read, or a [`Resource`][pydantic_ai.mcp.Resource] object.

    Returns:
        The resource contents — a single value if the resource has one content item, or a list
        otherwise. Text content is returned as `str`, binary content as
        [`BinaryContent`][pydantic_ai.messages.BinaryContent].

    Raises:
        MCPError: If the server returns an error.
    """
    resource_uri = uri if isinstance(uri, str) else uri.uri
    async with self:
        try:
            contents = await self.client.read_resource(AnyUrl(resource_uri))
        except mcp_exceptions.McpError as e:
            raise MCPError.from_mcp_sdk(e) from e

    return (
        _resource_content_to_pai(contents[0])
        if len(contents) == 1
        else [_resource_content_to_pai(c) for c in contents]
    )

load_mcp_servers deprecated

load_mcp_servers(
    config_path: str | Path,
) -> list[
    MCPServerStdio | MCPServerStreamableHTTP | MCPServerSSE
]
Deprecated

load_mcp_servers is deprecated and will be removed in v2. Use pydantic_ai.mcp.load_mcp_toolsets instead — same JSON config shape, returns MCPToolset instances wrapped with their server name as a tool prefix.

Load MCP servers from a configuration file.

Environment variables can be referenced in the configuration file using: - ${VAR_NAME} syntax - expands to the value of VAR_NAME, raises error if not defined - ${VAR_NAME:-default} syntax - expands to VAR_NAME if set, otherwise uses the default value

Parameters:

Name Type Description Default
config_path str | Path

The path to the configuration file.

required

Returns:

Type Description
list[MCPServerStdio | MCPServerStreamableHTTP | MCPServerSSE]

A list of MCP servers.

Raises:

Type Description
FileNotFoundError

If the configuration file does not exist.

ValidationError

If the configuration file does not match the schema.

ValueError

If an environment variable referenced in the configuration is not defined and no default value is provided.

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
@deprecated(
    '`load_mcp_servers` is deprecated and will be removed in v2. '
    'Use `pydantic_ai.mcp.load_mcp_toolsets` instead — same JSON config shape, returns `MCPToolset` '
    'instances wrapped with their server name as a tool prefix.'
)
def load_mcp_servers(
    config_path: str | Path,
) -> list[MCPServerStdio | MCPServerStreamableHTTP | MCPServerSSE]:  # pyright: ignore[reportDeprecated]
    """Load MCP servers from a configuration file.

    Environment variables can be referenced in the configuration file using:
    - `${VAR_NAME}` syntax - expands to the value of VAR_NAME, raises error if not defined
    - `${VAR_NAME:-default}` syntax - expands to VAR_NAME if set, otherwise uses the default value

    Args:
        config_path: The path to the configuration file.

    Returns:
        A list of MCP servers.

    Raises:
        FileNotFoundError: If the configuration file does not exist.
        ValidationError: If the configuration file does not match the schema.
        ValueError: If an environment variable referenced in the configuration is not defined and no default value is provided.
    """
    config_path = Path(config_path)

    if not config_path.exists():
        raise FileNotFoundError(f'Config file {config_path} not found')

    config_data = pydantic_core.from_json(config_path.read_bytes())
    expanded_config_data = _expand_env_vars(config_data)
    # Discriminator constructs deprecated `MCPServer*` instances; suppressing the warnings here
    # is intentional — `load_mcp_servers` is itself deprecated and returns these classes.
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', r'`MCPServer\w+` is deprecated', DeprecationWarning)
        config = _MCPServerConfig.model_validate(expanded_config_data)

    servers: list[MCPServerStdio | MCPServerStreamableHTTP | MCPServerSSE] = []  # pyright: ignore[reportDeprecated]
    for name, server in config.mcp_servers.items():
        server.id = name
        server.tool_prefix = name
        servers.append(server)

    return servers

load_mcp_toolsets

load_mcp_toolsets(
    config_path: str | Path,
) -> list[AbstractToolset[Any]]

Load MCPToolsets from a configuration file.

The configuration file uses the same mcpServers JSON shape as Claude Desktop, Cursor, and the MCP specification. Each server entry produces one MCPToolset, wrapped in a PrefixedToolset using the server's name as prefix to disambiguate tools across multiple servers.

Environment variables can be referenced in the configuration file using:

  • ${VAR_NAME} syntax — expands to the value of VAR_NAME, raises if not defined
  • ${VAR_NAME:-default} syntax — expands to VAR_NAME if set, otherwise the default

Parameters:

Name Type Description Default
config_path str | Path

Path to the JSON configuration file.

required

Returns:

Type Description
list[AbstractToolset[Any]]

A list of toolsets, one per server in the config file, each prefixed with the server name.

Raises:

Type Description
FileNotFoundError

If the configuration file does not exist.

ValidationError

If the configuration file does not match the schema.

ValueError

If an environment variable referenced in the configuration is not defined and no default is provided.

ImportError

If the fastmcp client isn't installed. Install the mcp extra (which pulls fastmcp-slim[client]): pip install "pydantic-ai-slim[mcp]".

Source code in pydantic_ai_slim/pydantic_ai/mcp.py
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
def load_mcp_toolsets(config_path: str | Path) -> list[AbstractToolset[Any]]:
    """Load `MCPToolset`s from a configuration file.

    The configuration file uses the same `mcpServers` JSON shape as Claude Desktop, Cursor, and the
    MCP specification. Each server entry produces one [`MCPToolset`][pydantic_ai.mcp.MCPToolset],
    wrapped in a [`PrefixedToolset`][pydantic_ai.toolsets.PrefixedToolset] using the server's name
    as prefix to disambiguate tools across multiple servers.

    Environment variables can be referenced in the configuration file using:

    - `${VAR_NAME}` syntax — expands to the value of `VAR_NAME`, raises if not defined
    - `${VAR_NAME:-default}` syntax — expands to `VAR_NAME` if set, otherwise the default

    Args:
        config_path: Path to the JSON configuration file.

    Returns:
        A list of toolsets, one per server in the config file, each prefixed with the server name.

    Raises:
        FileNotFoundError: If the configuration file does not exist.
        ValidationError: If the configuration file does not match the schema.
        ValueError: If an environment variable referenced in the configuration is not defined and
            no default is provided.
        ImportError: If the fastmcp client isn't installed. Install the `mcp` extra (which pulls
            `fastmcp-slim[client]`): `pip install "pydantic-ai-slim[mcp]"`.
    """
    _require_fastmcp()
    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f'Config file {config_path} not found')

    config_data = pydantic_core.from_json(config_path.read_bytes())
    expanded_config_data = _expand_env_vars(config_data)
    # `_MCPServerConfig` validates into deprecated `MCPServer*` subclasses; we only use them to
    # extract `command`/`args`/`url` and build fresh `MCPToolset`s below.
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', r'`MCPServer\w+` is deprecated', DeprecationWarning)
        config = _MCPServerConfig.model_validate(expanded_config_data)

    toolsets: list[AbstractToolset[Any]] = []
    for name, server in config.mcp_servers.items():
        toolset: MCPToolset[Any]
        if isinstance(server, MCPServerStdio):  # pyright: ignore[reportDeprecated]
            transport = StdioTransport(
                command=server.command,
                args=list(server.args),
                env=server.env,
                cwd=str(server.cwd) if server.cwd is not None else None,
            )
            toolset = MCPToolset(transport, id=name)
        elif isinstance(server, _MCPServerHTTP):
            toolset = MCPToolset(server.url, id=name, headers=server.headers)
        else:  # pragma: no cover
            assert_never(server)
        toolsets.append(toolset.prefixed(name))

    return toolsets