← requests / tests/test_requests.py
| 1 | """Tests for Requests.""" |
| 2 | |
| 3 | import collections |
| 4 | import contextlib |
| 5 | import io |
| 6 | import json |
| 7 | import os |
| 8 | import pickle |
| 9 | import re |
| 10 | import tempfile |
| 11 | import threading |
| 12 | import warnings |
| 13 | from unittest import mock |
| 14 | |
| 15 | import pytest |
| 16 | import urllib3 |
| 17 | from urllib3.util import Timeout as Urllib3Timeout |
| 18 | |
| 19 | import requests |
| 20 | from requests.adapters import HTTPAdapter |
| 21 | from requests.auth import HTTPDigestAuth, _basic_auth_str |
| 22 | from requests.compat import ( |
| 23 | JSONDecodeError, |
| 24 | Morsel, |
| 25 | MutableMapping, |
| 26 | builtin_str, |
| 27 | cookielib, |
| 28 | getproxies, |
| 29 | is_urllib3_1, |
| 30 | urlparse, |
| 31 | ) |
| 32 | from requests.cookies import cookiejar_from_dict, morsel_to_cookie |
| 33 | from requests.exceptions import ( |
| 34 | ChunkedEncodingError, |
| 35 | ConnectionError, |
| 36 | ConnectTimeout, |
| 37 | ContentDecodingError, |
| 38 | InvalidHeader, |
| 39 | InvalidProxyURL, |
| 40 | InvalidSchema, |
| 41 | InvalidURL, |
| 42 | MissingSchema, |
| 43 | ProxyError, |
| 44 | ReadTimeout, |
| 45 | RequestException, |
| 46 | RetryError, |
| 47 | Timeout, |
| 48 | TooManyRedirects, |
| 49 | UnrewindableBodyError, |
| 50 | ) |
| 51 | from requests.exceptions import SSLError as RequestsSSLError |
| 52 | from requests.hooks import default_hooks |
| 53 | from requests.models import PreparedRequest, urlencode |
| 54 | from requests.sessions import SessionRedirectMixin |
| 55 | from requests.structures import CaseInsensitiveDict |
| 56 | |
| 57 | from . import SNIMissingWarning |
| 58 | from .compat import StringIO |
| 59 | from .testserver.server import TLSServer, consume_socket_content |
| 60 | from .utils import override_environ |
| 61 | |
| 62 | # Requests to this URL should always fail with a connection timeout (nothing |
| 63 | # listening on that port) |
| 64 | TARPIT = "http://10.255.255.1" |
| 65 | |
| 66 | # This is to avoid waiting the timeout of using TARPIT |
| 67 | INVALID_PROXY = "http://localhost:1" |
| 68 | |
| 69 | try: |
| 70 | from ssl import SSLContext |
| 71 | |
| 72 | del SSLContext |
| 73 | HAS_MODERN_SSL = True |
| 74 | except ImportError: |
| 75 | HAS_MODERN_SSL = False |
| 76 | |
| 77 | try: |
| 78 | requests.pyopenssl |
| 79 | HAS_PYOPENSSL = True |
| 80 | except AttributeError: |
| 81 | HAS_PYOPENSSL = False |
| 82 | |
| 83 | |
| 84 | class TestRequests: |
| 85 | digest_auth_algo = ("MD5", "SHA-256", "SHA-512") |
| 86 | |
| 87 | def test_entry_points(self): |
| 88 | requests.session |
| 89 | requests.session().get |
| 90 | requests.session().head |
| 91 | requests.get |
| 92 | requests.head |
| 93 | requests.put |
| 94 | requests.patch |
| 95 | requests.post |
| 96 | # Not really an entry point, but people rely on it. |
| 97 | from requests.packages.urllib3.poolmanager import PoolManager # noqa:F401 |
| 98 | |
| 99 | @pytest.mark.parametrize( |
| 100 | "exception, url", |
| 101 | ( |
| 102 | (MissingSchema, "hiwpefhipowhefopw"), |
| 103 | (InvalidSchema, "localhost:3128"), |
| 104 | (InvalidSchema, "localhost.localdomain:3128/"), |
| 105 | (InvalidSchema, "10.122.1.1:3128/"), |
| 106 | (InvalidURL, "http://"), |
| 107 | (InvalidURL, "http://*example.com"), |
| 108 | (InvalidURL, "http://.example.com"), |
| 109 | ), |
| 110 | ) |
| 111 | def test_invalid_url(self, exception, url): |
| 112 | with pytest.raises(exception): |
| 113 | requests.get(url) |
| 114 | |
| 115 | def test_basic_building(self): |
| 116 | req = requests.Request() |
| 117 | req.url = "http://kennethreitz.org/" |
| 118 | req.data = {"life": "42"} |
| 119 | |
| 120 | pr = req.prepare() |
| 121 | assert pr.url == req.url |
| 122 | assert pr.body == "life=42" |
| 123 | |
| 124 | @pytest.mark.parametrize("method", ("GET", "HEAD")) |
| 125 | def test_no_content_length(self, httpbin, method): |
| 126 | req = requests.Request(method, httpbin(method.lower())).prepare() |
| 127 | assert "Content-Length" not in req.headers |
| 128 | |
| 129 | @pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS")) |
| 130 | def test_no_body_content_length(self, httpbin, method): |
| 131 | req = requests.Request(method, httpbin(method.lower())).prepare() |
| 132 | assert req.headers["Content-Length"] == "0" |
| 133 | |
| 134 | @pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS")) |
| 135 | def test_empty_content_length(self, httpbin, method): |
| 136 | req = requests.Request(method, httpbin(method.lower()), data="").prepare() |
| 137 | assert req.headers["Content-Length"] == "0" |
| 138 | |
| 139 | def test_override_content_length(self, httpbin): |
| 140 | headers = {"Content-Length": "not zero"} |
| 141 | r = requests.Request("POST", httpbin("post"), headers=headers).prepare() |
| 142 | assert "Content-Length" in r.headers |
| 143 | assert r.headers["Content-Length"] == "not zero" |
| 144 | |
| 145 | def test_path_is_not_double_encoded(self): |
| 146 | request = requests.Request("GET", "http://0.0.0.0/get/test case").prepare() |
| 147 | |
| 148 | assert request.path_url == "/get/test%20case" |
| 149 | |
| 150 | @pytest.mark.parametrize( |
| 151 | "url, expected", |
| 152 | ( |
| 153 | ( |
| 154 | "http://example.com/path#fragment", |
| 155 | "http://example.com/path?a=b#fragment", |
| 156 | ), |
| 157 | ( |
| 158 | "http://example.com/path?key=value#fragment", |
| 159 | "http://example.com/path?key=value&a=b#fragment", |
| 160 | ), |
| 161 | ), |
| 162 | ) |
| 163 | def test_params_are_added_before_fragment(self, url, expected): |
| 164 | request = requests.Request("GET", url, params={"a": "b"}).prepare() |
| 165 | assert request.url == expected |
| 166 | |
| 167 | def test_params_original_order_is_preserved_by_default(self): |
| 168 | param_ordered_dict = collections.OrderedDict( |
| 169 | (("z", 1), ("a", 1), ("k", 1), ("d", 1)) |
| 170 | ) |
| 171 | session = requests.Session() |
| 172 | request = requests.Request( |
| 173 | "GET", "http://example.com/", params=param_ordered_dict |
| 174 | ) |
| 175 | prep = session.prepare_request(request) |
| 176 | assert prep.url == "http://example.com/?z=1&a=1&k=1&d=1" |
| 177 | |
| 178 | def test_params_bytes_are_encoded(self): |
| 179 | request = requests.Request( |
| 180 | "GET", "http://example.com", params=b"test=foo" |
| 181 | ).prepare() |
| 182 | assert request.url == "http://example.com/?test=foo" |
| 183 | |
| 184 | def test_binary_put(self): |
| 185 | request = requests.Request( |
| 186 | "PUT", "http://example.com", data="ööö".encode() |
| 187 | ).prepare() |
| 188 | assert isinstance(request.body, bytes) |
| 189 | |
| 190 | def test_whitespaces_are_removed_from_url(self): |
| 191 | # Test for issue #3696 |
| 192 | request = requests.Request("GET", " http://example.com").prepare() |
| 193 | assert request.url == "http://example.com/" |
| 194 | |
| 195 | @pytest.mark.parametrize("scheme", ("http://", "HTTP://", "hTTp://", "HttP://")) |
| 196 | def test_mixed_case_scheme_acceptable(self, httpbin, scheme): |
| 197 | s = requests.Session() |
| 198 | s.proxies = getproxies() |
| 199 | parts = urlparse(httpbin("get")) |
| 200 | url = scheme + parts.netloc + parts.path |
| 201 | r = requests.Request("GET", url) |
| 202 | r = s.send(r.prepare()) |
| 203 | assert r.status_code == 200, f"failed for scheme {scheme}" |
| 204 | |
| 205 | def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin): |
| 206 | r = requests.Request("GET", httpbin("get")) |
| 207 | s = requests.Session() |
| 208 | s.proxies = getproxies() |
| 209 | |
| 210 | r = s.send(r.prepare()) |
| 211 | |
| 212 | assert r.status_code == 200 |
| 213 | |
| 214 | def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin): |
| 215 | r = requests.get(httpbin("redirect", "1")) |
| 216 | assert r.status_code == 200 |
| 217 | assert r.history[0].status_code == 302 |
| 218 | assert r.history[0].is_redirect |
| 219 | |
| 220 | def test_redirect_history_no_self_reference(self, httpbin): |
| 221 | r = requests.get(httpbin("redirect", "3")) |
| 222 | assert r.status_code == 200 |
| 223 | assert len(r.history) == 3 |
| 224 | for i, resp in enumerate(r.history): |
| 225 | assert resp not in resp.history |
| 226 | assert resp.history == r.history[:i] |
| 227 | |
| 228 | def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin): |
| 229 | r = requests.post( |
| 230 | httpbin("redirect-to"), |
| 231 | data="test", |
| 232 | params={"url": "post", "status_code": 307}, |
| 233 | ) |
| 234 | assert r.status_code == 200 |
| 235 | assert r.history[0].status_code == 307 |
| 236 | assert r.history[0].is_redirect |
| 237 | assert r.json()["data"] == "test" |
| 238 | |
| 239 | def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin): |
| 240 | byte_str = b"test" |
| 241 | r = requests.post( |
| 242 | httpbin("redirect-to"), |
| 243 | data=io.BytesIO(byte_str), |
| 244 | params={"url": "post", "status_code": 307}, |
| 245 | ) |
| 246 | assert r.status_code == 200 |
| 247 | assert r.history[0].status_code == 307 |
| 248 | assert r.history[0].is_redirect |
| 249 | assert r.json()["data"] == byte_str.decode("utf-8") |
| 250 | |
| 251 | def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin): |
| 252 | try: |
| 253 | requests.get(httpbin("relative-redirect", "50")) |
| 254 | except TooManyRedirects as e: |
| 255 | url = httpbin("relative-redirect", "20") |
| 256 | assert e.request.url == url |
| 257 | assert e.response.url == url |
| 258 | assert len(e.response.history) == 30 |
| 259 | else: |
| 260 | pytest.fail("Expected redirect to raise TooManyRedirects but it did not") |
| 261 | |
| 262 | def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin): |
| 263 | s = requests.session() |
| 264 | s.max_redirects = 5 |
| 265 | try: |
| 266 | s.get(httpbin("relative-redirect", "50")) |
| 267 | except TooManyRedirects as e: |
| 268 | url = httpbin("relative-redirect", "45") |
| 269 | assert e.request.url == url |
| 270 | assert e.response.url == url |
| 271 | assert len(e.response.history) == 5 |
| 272 | else: |
| 273 | pytest.fail( |
| 274 | "Expected custom max number of redirects to be respected but was not" |
| 275 | ) |
| 276 | |
| 277 | def test_http_301_changes_post_to_get(self, httpbin): |
| 278 | r = requests.post(httpbin("status", "301")) |
| 279 | assert r.status_code == 200 |
| 280 | assert r.request.method == "GET" |
| 281 | assert r.history[0].status_code == 301 |
| 282 | assert r.history[0].is_redirect |
| 283 | |
| 284 | def test_http_301_doesnt_change_head_to_get(self, httpbin): |
| 285 | r = requests.head(httpbin("status", "301"), allow_redirects=True) |
| 286 | print(r.content) |
| 287 | assert r.status_code == 200 |
| 288 | assert r.request.method == "HEAD" |
| 289 | assert r.history[0].status_code == 301 |
| 290 | assert r.history[0].is_redirect |
| 291 | |
| 292 | def test_http_302_changes_post_to_get(self, httpbin): |
| 293 | r = requests.post(httpbin("status", "302")) |
| 294 | assert r.status_code == 200 |
| 295 | assert r.request.method == "GET" |
| 296 | assert r.history[0].status_code == 302 |
| 297 | assert r.history[0].is_redirect |
| 298 | |
| 299 | def test_http_302_doesnt_change_head_to_get(self, httpbin): |
| 300 | r = requests.head(httpbin("status", "302"), allow_redirects=True) |
| 301 | assert r.status_code == 200 |
| 302 | assert r.request.method == "HEAD" |
| 303 | assert r.history[0].status_code == 302 |
| 304 | assert r.history[0].is_redirect |
| 305 | |
| 306 | def test_http_303_changes_post_to_get(self, httpbin): |
| 307 | r = requests.post(httpbin("status", "303")) |
| 308 | assert r.status_code == 200 |
| 309 | assert r.request.method == "GET" |
| 310 | assert r.history[0].status_code == 303 |
| 311 | assert r.history[0].is_redirect |
| 312 | |
| 313 | def test_http_303_doesnt_change_head_to_get(self, httpbin): |
| 314 | r = requests.head(httpbin("status", "303"), allow_redirects=True) |
| 315 | assert r.status_code == 200 |
| 316 | assert r.request.method == "HEAD" |
| 317 | assert r.history[0].status_code == 303 |
| 318 | assert r.history[0].is_redirect |
| 319 | |
| 320 | def test_header_and_body_removal_on_redirect(self, httpbin): |
| 321 | purged_headers = ("Content-Length", "Content-Type") |
| 322 | ses = requests.Session() |
| 323 | req = requests.Request("POST", httpbin("post"), data={"test": "data"}) |
| 324 | prep = ses.prepare_request(req) |
| 325 | resp = ses.send(prep) |
| 326 | |
| 327 | # Mimic a redirect response |
| 328 | resp.status_code = 302 |
| 329 | resp.headers["location"] = "get" |
| 330 | |
| 331 | # Run request through resolve_redirects |
| 332 | next_resp = next(ses.resolve_redirects(resp, prep)) |
| 333 | assert next_resp.request.body is None |
| 334 | for header in purged_headers: |
| 335 | assert header not in next_resp.request.headers |
| 336 | |
| 337 | def test_transfer_enc_removal_on_redirect(self, httpbin): |
| 338 | purged_headers = ("Transfer-Encoding", "Content-Type") |
| 339 | ses = requests.Session() |
| 340 | req = requests.Request("POST", httpbin("post"), data=(b"x" for x in range(1))) |
| 341 | prep = ses.prepare_request(req) |
| 342 | assert "Transfer-Encoding" in prep.headers |
| 343 | |
| 344 | # Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33 |
| 345 | resp = requests.Response() |
| 346 | resp.raw = io.BytesIO(b"the content") |
| 347 | resp.request = prep |
| 348 | setattr(resp.raw, "release_conn", lambda *args: args) |
| 349 | |
| 350 | # Mimic a redirect response |
| 351 | resp.status_code = 302 |
| 352 | resp.headers["location"] = httpbin("get") |
| 353 | |
| 354 | # Run request through resolve_redirect |
| 355 | next_resp = next(ses.resolve_redirects(resp, prep)) |
| 356 | assert next_resp.request.body is None |
| 357 | for header in purged_headers: |
| 358 | assert header not in next_resp.request.headers |
| 359 | |
| 360 | def test_fragment_maintained_on_redirect(self, httpbin): |
| 361 | fragment = "#view=edit&token=hunter2" |
| 362 | r = requests.get(httpbin("redirect-to?url=get") + fragment) |
| 363 | |
| 364 | assert len(r.history) > 0 |
| 365 | assert r.history[0].request.url == httpbin("redirect-to?url=get") + fragment |
| 366 | assert r.url == httpbin("get") + fragment |
| 367 | |
| 368 | def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin): |
| 369 | heads = {"User-agent": "Mozilla/5.0"} |
| 370 | |
| 371 | r = requests.get(httpbin("user-agent"), headers=heads) |
| 372 | |
| 373 | assert heads["User-agent"] in r.text |
| 374 | assert r.status_code == 200 |
| 375 | |
| 376 | def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin): |
| 377 | heads = {"User-agent": "Mozilla/5.0"} |
| 378 | |
| 379 | r = requests.get( |
| 380 | httpbin("get") + "?test=true", params={"q": "test"}, headers=heads |
| 381 | ) |
| 382 | assert r.status_code == 200 |
| 383 | |
| 384 | def test_set_cookie_on_301(self, httpbin): |
| 385 | s = requests.session() |
| 386 | url = httpbin("cookies/set?foo=bar") |
| 387 | s.get(url) |
| 388 | assert s.cookies["foo"] == "bar" |
| 389 | |
| 390 | def test_cookie_sent_on_redirect(self, httpbin): |
| 391 | s = requests.session() |
| 392 | s.get(httpbin("cookies/set?foo=bar")) |
| 393 | r = s.get(httpbin("redirect/1")) # redirects to httpbin('get') |
| 394 | assert "Cookie" in r.json()["headers"] |
| 395 | |
| 396 | def test_cookie_removed_on_expire(self, httpbin): |
| 397 | s = requests.session() |
| 398 | s.get(httpbin("cookies/set?foo=bar")) |
| 399 | assert s.cookies["foo"] == "bar" |
| 400 | s.get( |
| 401 | httpbin("response-headers"), |
| 402 | params={"Set-Cookie": "foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT"}, |
| 403 | ) |
| 404 | assert "foo" not in s.cookies |
| 405 | |
| 406 | def test_cookie_quote_wrapped(self, httpbin): |
| 407 | s = requests.session() |
| 408 | s.get(httpbin('cookies/set?foo="bar:baz"')) |
| 409 | assert s.cookies["foo"] == '"bar:baz"' |
| 410 | |
| 411 | def test_cookie_persists_via_api(self, httpbin): |
| 412 | s = requests.session() |
| 413 | r = s.get(httpbin("redirect/1"), cookies={"foo": "bar"}) |
| 414 | assert "foo" in r.request.headers["Cookie"] |
| 415 | assert "foo" in r.history[0].request.headers["Cookie"] |
| 416 | |
| 417 | def test_request_cookie_overrides_session_cookie(self, httpbin): |
| 418 | s = requests.session() |
| 419 | s.cookies["foo"] = "bar" |
| 420 | r = s.get(httpbin("cookies"), cookies={"foo": "baz"}) |
| 421 | assert r.json()["cookies"]["foo"] == "baz" |
| 422 | # Session cookie should not be modified |
| 423 | assert s.cookies["foo"] == "bar" |
| 424 | |
| 425 | def test_request_cookies_not_persisted(self, httpbin): |
| 426 | s = requests.session() |
| 427 | s.get(httpbin("cookies"), cookies={"foo": "baz"}) |
| 428 | # Sending a request with cookies should not add cookies to the session |
| 429 | assert not s.cookies |
| 430 | |
| 431 | def test_generic_cookiejar_works(self, httpbin): |
| 432 | cj = cookielib.CookieJar() |
| 433 | cookiejar_from_dict({"foo": "bar"}, cj) |
| 434 | s = requests.session() |
| 435 | s.cookies = cj |
| 436 | r = s.get(httpbin("cookies")) |
| 437 | # Make sure the cookie was sent |
| 438 | assert r.json()["cookies"]["foo"] == "bar" |
| 439 | # Make sure the session cj is still the custom one |
| 440 | assert s.cookies is cj |
| 441 | |
| 442 | def test_param_cookiejar_works(self, httpbin): |
| 443 | cj = cookielib.CookieJar() |
| 444 | cookiejar_from_dict({"foo": "bar"}, cj) |
| 445 | s = requests.session() |
| 446 | r = s.get(httpbin("cookies"), cookies=cj) |
| 447 | # Make sure the cookie was sent |
| 448 | assert r.json()["cookies"]["foo"] == "bar" |
| 449 | |
| 450 | def test_cookielib_cookiejar_on_redirect(self, httpbin): |
| 451 | """Tests resolve_redirect doesn't fail when merging cookies |
| 452 | with non-RequestsCookieJar cookiejar. |
| 453 | |
| 454 | See GH #3579 |
| 455 | """ |
| 456 | cj = cookiejar_from_dict({"foo": "bar"}, cookielib.CookieJar()) |
| 457 | s = requests.Session() |
| 458 | s.cookies = cookiejar_from_dict({"cookie": "tasty"}) |
| 459 | |
| 460 | # Prepare request without using Session |
| 461 | req = requests.Request("GET", httpbin("headers"), cookies=cj) |
| 462 | prep_req = req.prepare() |
| 463 | |
| 464 | # Send request and simulate redirect |
| 465 | resp = s.send(prep_req) |
| 466 | resp.status_code = 302 |
| 467 | resp.headers["location"] = httpbin("get") |
| 468 | redirects = s.resolve_redirects(resp, prep_req) |
| 469 | resp = next(redirects) |
| 470 | |
| 471 | # Verify CookieJar isn't being converted to RequestsCookieJar |
| 472 | assert isinstance(prep_req._cookies, cookielib.CookieJar) |
| 473 | assert isinstance(resp.request._cookies, cookielib.CookieJar) |
| 474 | assert not isinstance(resp.request._cookies, requests.cookies.RequestsCookieJar) |
| 475 | |
| 476 | cookies = {} |
| 477 | for c in resp.request._cookies: |
| 478 | cookies[c.name] = c.value |
| 479 | assert cookies["foo"] == "bar" |
| 480 | assert cookies["cookie"] == "tasty" |
| 481 | |
| 482 | def test_requests_in_history_are_not_overridden(self, httpbin): |
| 483 | resp = requests.get(httpbin("redirect/3")) |
| 484 | urls = [r.url for r in resp.history] |
| 485 | req_urls = [r.request.url for r in resp.history] |
| 486 | assert urls == req_urls |
| 487 | |
| 488 | def test_history_is_always_a_list(self, httpbin): |
| 489 | """Show that even with redirects, Response.history is always a list.""" |
| 490 | resp = requests.get(httpbin("get")) |
| 491 | assert isinstance(resp.history, list) |
| 492 | resp = requests.get(httpbin("redirect/1")) |
| 493 | assert isinstance(resp.history, list) |
| 494 | assert not isinstance(resp.history, tuple) |
| 495 | |
| 496 | def test_headers_on_session_with_None_are_not_sent(self, httpbin): |
| 497 | """Do not send headers in Session.headers with None values.""" |
| 498 | ses = requests.Session() |
| 499 | ses.headers["Accept-Encoding"] = None |
| 500 | req = requests.Request("GET", httpbin("get")) |
| 501 | prep = ses.prepare_request(req) |
| 502 | assert "Accept-Encoding" not in prep.headers |
| 503 | |
| 504 | def test_headers_preserve_order(self, httpbin): |
| 505 | """Preserve order when headers provided as OrderedDict.""" |
| 506 | ses = requests.Session() |
| 507 | ses.headers = collections.OrderedDict() |
| 508 | ses.headers["Accept-Encoding"] = "identity" |
| 509 | ses.headers["First"] = "1" |
| 510 | ses.headers["Second"] = "2" |
| 511 | headers = collections.OrderedDict([("Third", "3"), ("Fourth", "4")]) |
| 512 | headers["Fifth"] = "5" |
| 513 | headers["Second"] = "222" |
| 514 | req = requests.Request("GET", httpbin("get"), headers=headers) |
| 515 | prep = ses.prepare_request(req) |
| 516 | items = list(prep.headers.items()) |
| 517 | assert items[0] == ("Accept-Encoding", "identity") |
| 518 | assert items[1] == ("First", "1") |
| 519 | assert items[2] == ("Second", "222") |
| 520 | assert items[3] == ("Third", "3") |
| 521 | assert items[4] == ("Fourth", "4") |
| 522 | assert items[5] == ("Fifth", "5") |
| 523 | |
| 524 | @pytest.mark.parametrize("key", ("User-agent", "user-agent")) |
| 525 | def test_user_agent_transfers(self, httpbin, key): |
| 526 | heads = {key: "Mozilla/5.0 (github.com/psf/requests)"} |
| 527 | |
| 528 | r = requests.get(httpbin("user-agent"), headers=heads) |
| 529 | assert heads[key] in r.text |
| 530 | |
| 531 | def test_HTTP_200_OK_HEAD(self, httpbin): |
| 532 | r = requests.head(httpbin("get")) |
| 533 | assert r.status_code == 200 |
| 534 | |
| 535 | def test_HTTP_200_OK_PUT(self, httpbin): |
| 536 | r = requests.put(httpbin("put")) |
| 537 | assert r.status_code == 200 |
| 538 | |
| 539 | def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin): |
| 540 | auth = ("user", "pass") |
| 541 | url = httpbin("basic-auth", "user", "pass") |
| 542 | |
| 543 | r = requests.get(url, auth=auth) |
| 544 | assert r.status_code == 200 |
| 545 | |
| 546 | r = requests.get(url) |
| 547 | assert r.status_code == 401 |
| 548 | |
| 549 | s = requests.session() |
| 550 | s.auth = auth |
| 551 | r = s.get(url) |
| 552 | assert r.status_code == 200 |
| 553 | |
| 554 | @pytest.mark.parametrize( |
| 555 | "username, password", |
| 556 | ( |
| 557 | ("user", "pass"), |
| 558 | ("имя".encode(), "пароль".encode()), |
| 559 | (42, 42), |
| 560 | (None, None), |
| 561 | ), |
| 562 | ) |
| 563 | def test_set_basicauth(self, httpbin, username, password): |
| 564 | auth = (username, password) |
| 565 | url = httpbin("get") |
| 566 | |
| 567 | r = requests.Request("GET", url, auth=auth) |
| 568 | p = r.prepare() |
| 569 | |
| 570 | assert p.headers["Authorization"] == _basic_auth_str(username, password) |
| 571 | |
| 572 | def test_basicauth_encodes_byte_strings(self): |
| 573 | """Ensure b'test' formats as the byte string "test" rather |
| 574 | than the unicode string "b'test'" in Python 3. |
| 575 | """ |
| 576 | auth = (b"\xc5\xafsername", b"test\xc6\xb6") |
| 577 | r = requests.Request("GET", "http://localhost", auth=auth) |
| 578 | p = r.prepare() |
| 579 | |
| 580 | assert p.headers["Authorization"] == "Basic xa9zZXJuYW1lOnRlc3TGtg==" |
| 581 | |
| 582 | @pytest.mark.parametrize( |
| 583 | "url, exception", |
| 584 | ( |
| 585 | # Connecting to an unknown domain should raise a ConnectionError |
| 586 | ("http://doesnotexist.google.com", ConnectionError), |
| 587 | # Connecting to an invalid port should raise a ConnectionError |
| 588 | ("http://localhost:1", ConnectionError), |
| 589 | # Inputting a URL that cannot be parsed should raise an InvalidURL error |
| 590 | ("http://fe80::5054:ff:fe5a:fc0", InvalidURL), |
| 591 | ), |
| 592 | ) |
| 593 | def test_errors(self, url, exception): |
| 594 | with pytest.raises(exception): |
| 595 | requests.get(url, timeout=1) |
| 596 | |
| 597 | def test_proxy_error(self): |
| 598 | # any proxy related error (address resolution, no route to host, etc) should result in a ProxyError |
| 599 | with pytest.raises(ProxyError): |
| 600 | requests.get( |
| 601 | "http://localhost:1", proxies={"http": "non-resolvable-address"} |
| 602 | ) |
| 603 | |
| 604 | def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure): |
| 605 | with pytest.raises(InvalidProxyURL): |
| 606 | requests.get(httpbin_secure(), proxies={"https": "http:/badproxyurl:3128"}) |
| 607 | |
| 608 | with pytest.raises(InvalidProxyURL): |
| 609 | requests.get(httpbin(), proxies={"http": "http://:8080"}) |
| 610 | |
| 611 | with pytest.raises(InvalidProxyURL): |
| 612 | requests.get(httpbin_secure(), proxies={"https": "https://"}) |
| 613 | |
| 614 | with pytest.raises(InvalidProxyURL): |
| 615 | requests.get(httpbin(), proxies={"http": "http:///example.com:8080"}) |
| 616 | |
| 617 | def test_respect_proxy_env_on_send_self_prepared_request(self, httpbin): |
| 618 | with override_environ(http_proxy=INVALID_PROXY): |
| 619 | with pytest.raises(ProxyError): |
| 620 | session = requests.Session() |
| 621 | request = requests.Request("GET", httpbin()) |
| 622 | session.send(request.prepare()) |
| 623 | |
| 624 | def test_respect_proxy_env_on_send_session_prepared_request(self, httpbin): |
| 625 | with override_environ(http_proxy=INVALID_PROXY): |
| 626 | with pytest.raises(ProxyError): |
| 627 | session = requests.Session() |
| 628 | request = requests.Request("GET", httpbin()) |
| 629 | prepared = session.prepare_request(request) |
| 630 | session.send(prepared) |
| 631 | |
| 632 | def test_respect_proxy_env_on_send_with_redirects(self, httpbin): |
| 633 | with override_environ(http_proxy=INVALID_PROXY): |
| 634 | with pytest.raises(ProxyError): |
| 635 | session = requests.Session() |
| 636 | url = httpbin("redirect/1") |
| 637 | print(url) |
| 638 | request = requests.Request("GET", url) |
| 639 | session.send(request.prepare()) |
| 640 | |
| 641 | def test_respect_proxy_env_on_get(self, httpbin): |
| 642 | with override_environ(http_proxy=INVALID_PROXY): |
| 643 | with pytest.raises(ProxyError): |
| 644 | session = requests.Session() |
| 645 | session.get(httpbin()) |
| 646 | |
| 647 | def test_respect_proxy_env_on_request(self, httpbin): |
| 648 | with override_environ(http_proxy=INVALID_PROXY): |
| 649 | with pytest.raises(ProxyError): |
| 650 | session = requests.Session() |
| 651 | session.request(method="GET", url=httpbin()) |
| 652 | |
| 653 | def test_proxy_authorization_preserved_on_request(self, httpbin): |
| 654 | proxy_auth_value = "Bearer XXX" |
| 655 | session = requests.Session() |
| 656 | session.headers.update({"Proxy-Authorization": proxy_auth_value}) |
| 657 | resp = session.request(method="GET", url=httpbin("get")) |
| 658 | sent_headers = resp.json().get("headers", {}) |
| 659 | |
| 660 | assert sent_headers.get("Proxy-Authorization") == proxy_auth_value |
| 661 | |
| 662 | @pytest.mark.parametrize( |
| 663 | "url,has_proxy_auth", |
| 664 | ( |
| 665 | ("http://example.com", True), |
| 666 | ("https://example.com", False), |
| 667 | ), |
| 668 | ) |
| 669 | def test_proxy_authorization_not_appended_to_https_request( |
| 670 | self, url, has_proxy_auth |
| 671 | ): |
| 672 | session = requests.Session() |
| 673 | proxies = { |
| 674 | "http": "http://test:pass@localhost:8080", |
| 675 | "https": "http://test:pass@localhost:8090", |
| 676 | } |
| 677 | req = requests.Request("GET", url) |
| 678 | prep = req.prepare() |
| 679 | session.rebuild_proxies(prep, proxies) |
| 680 | |
| 681 | assert ("Proxy-Authorization" in prep.headers) is has_proxy_auth |
| 682 | |
| 683 | def test_basicauth_with_netrc(self, httpbin): |
| 684 | auth = ("user", "pass") |
| 685 | wrong_auth = ("wronguser", "wrongpass") |
| 686 | url = httpbin("basic-auth", "user", "pass") |
| 687 | |
| 688 | old_auth = requests.sessions.get_netrc_auth |
| 689 | |
| 690 | try: |
| 691 | |
| 692 | def get_netrc_auth_mock(url): |
| 693 | return auth |
| 694 | |
| 695 | requests.sessions.get_netrc_auth = get_netrc_auth_mock |
| 696 | |
| 697 | # Should use netrc and work. |
| 698 | r = requests.get(url) |
| 699 | assert r.status_code == 200 |
| 700 | |
| 701 | # Given auth should override and fail. |
| 702 | r = requests.get(url, auth=wrong_auth) |
| 703 | assert r.status_code == 401 |
| 704 | |
| 705 | s = requests.session() |
| 706 | |
| 707 | # Should use netrc and work. |
| 708 | r = s.get(url) |
| 709 | assert r.status_code == 200 |
| 710 | |
| 711 | # Given auth should override and fail. |
| 712 | s.auth = wrong_auth |
| 713 | r = s.get(url) |
| 714 | assert r.status_code == 401 |
| 715 | finally: |
| 716 | requests.sessions.get_netrc_auth = old_auth |
| 717 | |
| 718 | def test_basicauth_with_netrc_leak(self, httpbin): |
| 719 | url1 = httpbin("basic-auth", "user", "pass") |
| 720 | url = url1[len("http://") :] |
| 721 | domain = url.split(":")[0] |
| 722 | url = f"http://example.com:@{url}" |
| 723 | |
| 724 | netrc_file = "" |
| 725 | with tempfile.NamedTemporaryFile(mode="w", delete=False) as fp: |
| 726 | fp.write("machine example.com\n") |
| 727 | fp.write("login wronguser\n") |
| 728 | fp.write("password wrongpass\n") |
| 729 | fp.write(f"machine {domain}\n") |
| 730 | fp.write("login user\n") |
| 731 | fp.write("password pass\n") |
| 732 | fp.close() |
| 733 | netrc_file = fp.name |
| 734 | |
| 735 | old_netrc = os.environ.get("NETRC", "") |
| 736 | os.environ["NETRC"] = netrc_file |
| 737 | |
| 738 | try: |
| 739 | # Should use netrc |
| 740 | # Make sure that we don't use the example.com credentials |
| 741 | # for the request |
| 742 | r = requests.get(url) |
| 743 | assert r.status_code == 200 |
| 744 | finally: |
| 745 | os.environ["NETRC"] = old_netrc |
| 746 | os.unlink(netrc_file) |
| 747 | |
| 748 | def test_DIGEST_HTTP_200_OK_GET(self, httpbin): |
| 749 | for authtype in self.digest_auth_algo: |
| 750 | auth = HTTPDigestAuth("user", "pass") |
| 751 | url = httpbin("digest-auth", "auth", "user", "pass", authtype, "never") |
| 752 | |
| 753 | r = requests.get(url, auth=auth) |
| 754 | assert r.status_code == 200 |
| 755 | |
| 756 | r = requests.get(url) |
| 757 | assert r.status_code == 401 |
| 758 | print(r.headers["WWW-Authenticate"]) |
| 759 | |
| 760 | s = requests.session() |
| 761 | s.auth = HTTPDigestAuth("user", "pass") |
| 762 | r = s.get(url) |
| 763 | assert r.status_code == 200 |
| 764 | |
| 765 | def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin): |
| 766 | for authtype in self.digest_auth_algo: |
| 767 | url = httpbin("digest-auth", "auth", "user", "pass", authtype) |
| 768 | auth = HTTPDigestAuth("user", "pass") |
| 769 | r = requests.get(url) |
| 770 | assert r.cookies["fake"] == "fake_value" |
| 771 | |
| 772 | r = requests.get(url, auth=auth) |
| 773 | assert r.status_code == 200 |
| 774 | |
| 775 | def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin): |
| 776 | for authtype in self.digest_auth_algo: |
| 777 | url = httpbin("digest-auth", "auth", "user", "pass", authtype) |
| 778 | auth = HTTPDigestAuth("user", "pass") |
| 779 | s = requests.Session() |
| 780 | s.get(url, auth=auth) |
| 781 | assert s.cookies["fake"] == "fake_value" |
| 782 | |
| 783 | def test_DIGEST_STREAM(self, httpbin): |
| 784 | for authtype in self.digest_auth_algo: |
| 785 | auth = HTTPDigestAuth("user", "pass") |
| 786 | url = httpbin("digest-auth", "auth", "user", "pass", authtype) |
| 787 | |
| 788 | r = requests.get(url, auth=auth, stream=True) |
| 789 | assert r.raw.read() != b"" |
| 790 | |
| 791 | r = requests.get(url, auth=auth, stream=False) |
| 792 | assert r.raw.read() == b"" |
| 793 | |
| 794 | def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin): |
| 795 | for authtype in self.digest_auth_algo: |
| 796 | auth = HTTPDigestAuth("user", "wrongpass") |
| 797 | url = httpbin("digest-auth", "auth", "user", "pass", authtype) |
| 798 | |
| 799 | r = requests.get(url, auth=auth) |
| 800 | assert r.status_code == 401 |
| 801 | |
| 802 | r = requests.get(url) |
| 803 | assert r.status_code == 401 |
| 804 | |
| 805 | s = requests.session() |
| 806 | s.auth = auth |
| 807 | r = s.get(url) |
| 808 | assert r.status_code == 401 |
| 809 | |
| 810 | def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin): |
| 811 | for authtype in self.digest_auth_algo: |
| 812 | auth = HTTPDigestAuth("user", "pass") |
| 813 | url = httpbin("digest-auth", "auth", "user", "pass", authtype) |
| 814 | |
| 815 | r = requests.get(url, auth=auth) |
| 816 | assert '"auth"' in r.request.headers["Authorization"] |
| 817 | |
| 818 | def test_POSTBIN_GET_POST_FILES(self, httpbin): |
| 819 | url = httpbin("post") |
| 820 | requests.post(url).raise_for_status() |
| 821 | |
| 822 | post1 = requests.post(url, data={"some": "data"}) |
| 823 | assert post1.status_code == 200 |
| 824 | |
| 825 | with open("requirements-dev.txt") as f: |
| 826 | post2 = requests.post(url, files={"some": f}) |
| 827 | assert post2.status_code == 200 |
| 828 | |
| 829 | post4 = requests.post(url, data='[{"some": "json"}]') |
| 830 | assert post4.status_code == 200 |
| 831 | |
| 832 | with pytest.raises(ValueError): |
| 833 | requests.post(url, files=["bad file data"]) |
| 834 | |
| 835 | def test_invalid_files_input(self, httpbin): |
| 836 | url = httpbin("post") |
| 837 | post = requests.post(url, files={"random-file-1": None, "random-file-2": 1}) |
| 838 | assert b'name="random-file-1"' not in post.request.body |
| 839 | assert b'name="random-file-2"' in post.request.body |
| 840 | |
| 841 | def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin): |
| 842 | class TestStream: |
| 843 | def __init__(self, data): |
| 844 | self.data = data.encode() |
| 845 | self.length = len(self.data) |
| 846 | self.index = 0 |
| 847 | |
| 848 | def __len__(self): |
| 849 | return self.length |
| 850 | |
| 851 | def read(self, size=None): |
| 852 | if size: |
| 853 | ret = self.data[self.index : self.index + size] |
| 854 | self.index += size |
| 855 | else: |
| 856 | ret = self.data[self.index :] |
| 857 | self.index = self.length |
| 858 | return ret |
| 859 | |
| 860 | def tell(self): |
| 861 | return self.index |
| 862 | |
| 863 | def seek(self, offset, where=0): |
| 864 | if where == 0: |
| 865 | self.index = offset |
| 866 | elif where == 1: |
| 867 | self.index += offset |
| 868 | elif where == 2: |
| 869 | self.index = self.length + offset |
| 870 | |
| 871 | test = TestStream("test") |
| 872 | post1 = requests.post(httpbin("post"), data=test) |
| 873 | assert post1.status_code == 200 |
| 874 | assert post1.json()["data"] == "test" |
| 875 | |
| 876 | test = TestStream("test") |
| 877 | test.seek(2) |
| 878 | post2 = requests.post(httpbin("post"), data=test) |
| 879 | assert post2.status_code == 200 |
| 880 | assert post2.json()["data"] == "st" |
| 881 | |
| 882 | def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin): |
| 883 | url = httpbin("post") |
| 884 | requests.post(url).raise_for_status() |
| 885 | |
| 886 | post1 = requests.post(url, data={"some": "data"}) |
| 887 | assert post1.status_code == 200 |
| 888 | |
| 889 | with open("requirements-dev.txt") as f: |
| 890 | post2 = requests.post(url, data={"some": "data"}, files={"some": f}) |
| 891 | assert post2.status_code == 200 |
| 892 | |
| 893 | post4 = requests.post(url, data='[{"some": "json"}]') |
| 894 | assert post4.status_code == 200 |
| 895 | |
| 896 | with pytest.raises(ValueError): |
| 897 | requests.post(url, files=["bad file data"]) |
| 898 | |
| 899 | def test_post_with_custom_mapping(self, httpbin): |
| 900 | class CustomMapping(MutableMapping): |
| 901 | def __init__(self, *args, **kwargs): |
| 902 | self.data = dict(*args, **kwargs) |
| 903 | |
| 904 | def __delitem__(self, key): |
| 905 | del self.data[key] |
| 906 | |
| 907 | def __getitem__(self, key): |
| 908 | return self.data[key] |
| 909 | |
| 910 | def __setitem__(self, key, value): |
| 911 | self.data[key] = value |
| 912 | |
| 913 | def __iter__(self): |
| 914 | return iter(self.data) |
| 915 | |
| 916 | def __len__(self): |
| 917 | return len(self.data) |
| 918 | |
| 919 | data = CustomMapping({"some": "data"}) |
| 920 | url = httpbin("post") |
| 921 | found_json = requests.post(url, data=data).json().get("form") |
| 922 | assert found_json == {"some": "data"} |
| 923 | |
| 924 | def test_conflicting_post_params(self, httpbin): |
| 925 | url = httpbin("post") |
| 926 | with open("requirements-dev.txt") as f: |
| 927 | with pytest.raises(ValueError): |
| 928 | requests.post(url, data='[{"some": "data"}]', files={"some": f}) |
| 929 | |
| 930 | def test_request_ok_set(self, httpbin): |
| 931 | r = requests.get(httpbin("status", "404")) |
| 932 | assert not r.ok |
| 933 | |
| 934 | def test_status_raising(self, httpbin): |
| 935 | r = requests.get(httpbin("status", "404")) |
| 936 | with pytest.raises(requests.exceptions.HTTPError): |
| 937 | r.raise_for_status() |
| 938 | |
| 939 | r = requests.get(httpbin("status", "500")) |
| 940 | assert not r.ok |
| 941 | |
| 942 | def test_decompress_gzip(self, httpbin): |
| 943 | r = requests.get(httpbin("gzip")) |
| 944 | r.content.decode("ascii") |
| 945 | |
| 946 | @pytest.mark.parametrize( |
| 947 | "url, params", |
| 948 | ( |
| 949 | ("/get", {"foo": "føø"}), |
| 950 | ("/get", {"føø": "føø"}), |
| 951 | ("/get", {"føø": "føø"}), |
| 952 | ("/get", {"foo": "foo"}), |
| 953 | ("ø", {"foo": "foo"}), |
| 954 | ), |
| 955 | ) |
| 956 | def test_unicode_get(self, httpbin, url, params): |
| 957 | requests.get(httpbin(url), params=params) |
| 958 | |
| 959 | def test_unicode_header_name(self, httpbin): |
| 960 | requests.put( |
| 961 | httpbin("put"), |
| 962 | headers={"Content-Type": "application/octet-stream"}, |
| 963 | data="\xff", |
| 964 | ) # compat.str is unicode. |
| 965 | |
| 966 | def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle): |
| 967 | requests.get(httpbin_secure("status", "301"), verify=httpbin_ca_bundle) |
| 968 | |
| 969 | def test_invalid_ca_certificate_path(self, httpbin_secure): |
| 970 | INVALID_PATH = "/garbage" |
| 971 | with pytest.raises(IOError) as e: |
| 972 | requests.get(httpbin_secure(), verify=INVALID_PATH) |
| 973 | assert ( |
| 974 | str(e.value) |
| 975 | == f"Could not find a suitable TLS CA certificate bundle, invalid path: {INVALID_PATH}" |
| 976 | ) |
| 977 | |
| 978 | def test_invalid_ssl_certificate_files(self, httpbin_secure): |
| 979 | INVALID_PATH = "/garbage" |
| 980 | with pytest.raises(IOError) as e: |
| 981 | requests.get(httpbin_secure(), cert=INVALID_PATH) |
| 982 | assert ( |
| 983 | str(e.value) |
| 984 | == f"Could not find the TLS certificate file, invalid path: {INVALID_PATH}" |
| 985 | ) |
| 986 | |
| 987 | with pytest.raises(IOError) as e: |
| 988 | requests.get(httpbin_secure(), cert=(".", INVALID_PATH)) |
| 989 | assert str(e.value) == ( |
| 990 | f"Could not find the TLS key file, invalid path: {INVALID_PATH}" |
| 991 | ) |
| 992 | |
| 993 | @pytest.mark.parametrize( |
| 994 | "env, expected", |
| 995 | ( |
| 996 | ({}, True), |
| 997 | ({"REQUESTS_CA_BUNDLE": "/some/path"}, "/some/path"), |
| 998 | ({"REQUESTS_CA_BUNDLE": ""}, True), |
| 999 | ({"CURL_CA_BUNDLE": "/some/path"}, "/some/path"), |
| 1000 | ({"CURL_CA_BUNDLE": ""}, True), |
| 1001 | ({"REQUESTS_CA_BUNDLE": "", "CURL_CA_BUNDLE": ""}, True), |
| 1002 | ( |
| 1003 | { |
| 1004 | "REQUESTS_CA_BUNDLE": "/some/path", |
| 1005 | "CURL_CA_BUNDLE": "/curl/path", |
| 1006 | }, |
| 1007 | "/some/path", |
| 1008 | ), |
| 1009 | ( |
| 1010 | { |
| 1011 | "REQUESTS_CA_BUNDLE": "", |
| 1012 | "CURL_CA_BUNDLE": "/curl/path", |
| 1013 | }, |
| 1014 | "/curl/path", |
| 1015 | ), |
| 1016 | ), |
| 1017 | ) |
| 1018 | def test_env_cert_bundles(self, httpbin, env, expected): |
| 1019 | s = requests.Session() |
| 1020 | with mock.patch("os.environ", env): |
| 1021 | settings = s.merge_environment_settings( |
| 1022 | url=httpbin("get"), proxies={}, stream=False, verify=True, cert=None |
| 1023 | ) |
| 1024 | assert settings["verify"] == expected |
| 1025 | |
| 1026 | def test_http_with_certificate(self, httpbin): |
| 1027 | r = requests.get(httpbin(), cert=".") |
| 1028 | assert r.status_code == 200 |
| 1029 | |
| 1030 | @pytest.mark.skipif( |
| 1031 | SNIMissingWarning is None, |
| 1032 | reason="urllib3 2.0 removed that warning and errors out instead", |
| 1033 | ) |
| 1034 | def test_https_warnings(self, nosan_server): |
| 1035 | """warnings are emitted with requests.get""" |
| 1036 | host, port, ca_bundle = nosan_server |
| 1037 | if HAS_MODERN_SSL or HAS_PYOPENSSL: |
| 1038 | warnings_expected = ("SubjectAltNameWarning",) |
| 1039 | else: |
| 1040 | warnings_expected = ( |
| 1041 | "SNIMissingWarning", |
| 1042 | "InsecurePlatformWarning", |
| 1043 | "SubjectAltNameWarning", |
| 1044 | ) |
| 1045 | |
| 1046 | with pytest.warns() as warning_records: |
| 1047 | warnings.simplefilter("always") |
| 1048 | requests.get(f"https://localhost:{port}/", verify=ca_bundle) |
| 1049 | |
| 1050 | warning_records = [ |
| 1051 | item |
| 1052 | for item in warning_records |
| 1053 | if item.category.__name__ != "ResourceWarning" |
| 1054 | ] |
| 1055 | |
| 1056 | warnings_category = tuple(item.category.__name__ for item in warning_records) |
| 1057 | assert warnings_category == warnings_expected |
| 1058 | |
| 1059 | def test_certificate_failure(self, httpbin_secure): |
| 1060 | """ |
| 1061 | When underlying SSL problems occur, an SSLError is raised. |
| 1062 | """ |
| 1063 | with pytest.raises(RequestsSSLError): |
| 1064 | # Our local httpbin does not have a trusted CA, so this call will |
| 1065 | # fail if we use our default trust bundle. |
| 1066 | requests.get(httpbin_secure("status", "200")) |
| 1067 | |
| 1068 | def test_urlencoded_get_query_multivalued_param(self, httpbin): |
| 1069 | r = requests.get(httpbin("get"), params={"test": ["foo", "baz"]}) |
| 1070 | assert r.status_code == 200 |
| 1071 | assert r.url == httpbin("get?test=foo&test=baz") |
| 1072 | |
| 1073 | def test_form_encoded_post_query_multivalued_element(self, httpbin): |
| 1074 | r = requests.Request( |
| 1075 | method="POST", url=httpbin("post"), data=dict(test=["foo", "baz"]) |
| 1076 | ) |
| 1077 | prep = r.prepare() |
| 1078 | assert prep.body == "test=foo&test=baz" |
| 1079 | |
| 1080 | def test_different_encodings_dont_break_post(self, httpbin): |
| 1081 | with open(__file__, "rb") as f: |
| 1082 | r = requests.post( |
| 1083 | httpbin("post"), |
| 1084 | data={"stuff": json.dumps({"a": 123})}, |
| 1085 | params={"blah": "asdf1234"}, |
| 1086 | files={"file": ("test_requests.py", f)}, |
| 1087 | ) |
| 1088 | assert r.status_code == 200 |
| 1089 | |
| 1090 | @pytest.mark.parametrize( |
| 1091 | "data", |
| 1092 | ( |
| 1093 | {"stuff": "ëlïxr"}, |
| 1094 | {"stuff": "ëlïxr".encode()}, |
| 1095 | {"stuff": "elixr"}, |
| 1096 | {"stuff": b"elixr"}, |
| 1097 | ), |
| 1098 | ) |
| 1099 | def test_unicode_multipart_post(self, httpbin, data): |
| 1100 | with open(__file__, "rb") as f: |
| 1101 | r = requests.post( |
| 1102 | httpbin("post"), |
| 1103 | data=data, |
| 1104 | files={"file": ("test_requests.py", f)}, |
| 1105 | ) |
| 1106 | assert r.status_code == 200 |
| 1107 | |
| 1108 | def test_unicode_multipart_post_fieldnames(self, httpbin): |
| 1109 | filename = os.path.splitext(__file__)[0] + ".py" |
| 1110 | with open(filename, "rb") as f: |
| 1111 | r = requests.Request( |
| 1112 | method="POST", |
| 1113 | url=httpbin("post"), |
| 1114 | data={b"stuff": "elixr"}, |
| 1115 | files={"file": ("test_requests.py", f)}, |
| 1116 | ) |
| 1117 | prep = r.prepare() |
| 1118 | |
| 1119 | assert b'name="stuff"' in prep.body |
| 1120 | assert b"name=\"b'stuff'\"" not in prep.body |
| 1121 | |
| 1122 | def test_unicode_method_name(self, httpbin): |
| 1123 | with open(__file__, "rb") as f: |
| 1124 | files = {"file": f} |
| 1125 | r = requests.request( |
| 1126 | method="POST", |
| 1127 | url=httpbin("post"), |
| 1128 | files=files, |
| 1129 | ) |
| 1130 | assert r.status_code == 200 |
| 1131 | |
| 1132 | def test_unicode_method_name_with_request_object(self, httpbin): |
| 1133 | s = requests.Session() |
| 1134 | with open(__file__, "rb") as f: |
| 1135 | files = {"file": f} |
| 1136 | req = requests.Request("POST", httpbin("post"), files=files) |
| 1137 | prep = s.prepare_request(req) |
| 1138 | assert isinstance(prep.method, builtin_str) |
| 1139 | assert prep.method == "POST" |
| 1140 | |
| 1141 | resp = s.send(prep) |
| 1142 | assert resp.status_code == 200 |
| 1143 | |
| 1144 | def test_non_prepared_request_error(self): |
| 1145 | s = requests.Session() |
| 1146 | req = requests.Request("POST", "/") |
| 1147 | |
| 1148 | with pytest.raises(ValueError) as e: |
| 1149 | s.send(req) |
| 1150 | assert str(e.value) == "You can only send PreparedRequests." |
| 1151 | |
| 1152 | def test_custom_content_type(self, httpbin): |
| 1153 | with open(__file__, "rb") as f1: |
| 1154 | with open(__file__, "rb") as f2: |
| 1155 | data = {"stuff": json.dumps({"a": 123})} |
| 1156 | files = { |
| 1157 | "file1": ("test_requests.py", f1), |
| 1158 | "file2": ("test_requests", f2, "text/py-content-type"), |
| 1159 | } |
| 1160 | r = requests.post(httpbin("post"), data=data, files=files) |
| 1161 | assert r.status_code == 200 |
| 1162 | assert b"text/py-content-type" in r.request.body |
| 1163 | |
| 1164 | def test_hook_receives_request_arguments(self, httpbin): |
| 1165 | def hook(resp, **kwargs): |
| 1166 | assert resp is not None |
| 1167 | assert kwargs != {} |
| 1168 | |
| 1169 | s = requests.Session() |
| 1170 | r = requests.Request("GET", httpbin(), hooks={"response": hook}) |
| 1171 | prep = s.prepare_request(r) |
| 1172 | s.send(prep) |
| 1173 | |
| 1174 | def test_session_hooks_are_used_with_no_request_hooks(self, httpbin): |
| 1175 | def hook(*args, **kwargs): |
| 1176 | pass |
| 1177 | |
| 1178 | s = requests.Session() |
| 1179 | s.hooks["response"].append(hook) |
| 1180 | r = requests.Request("GET", httpbin()) |
| 1181 | prep = s.prepare_request(r) |
| 1182 | assert prep.hooks["response"] != [] |
| 1183 | assert prep.hooks["response"] == [hook] |
| 1184 | |
| 1185 | def test_session_hooks_are_overridden_by_request_hooks(self, httpbin): |
| 1186 | def hook1(*args, **kwargs): |
| 1187 | pass |
| 1188 | |
| 1189 | def hook2(*args, **kwargs): |
| 1190 | pass |
| 1191 | |
| 1192 | assert hook1 is not hook2 |
| 1193 | s = requests.Session() |
| 1194 | s.hooks["response"].append(hook2) |
| 1195 | r = requests.Request("GET", httpbin(), hooks={"response": [hook1]}) |
| 1196 | prep = s.prepare_request(r) |
| 1197 | assert prep.hooks["response"] == [hook1] |
| 1198 | |
| 1199 | def test_prepared_request_hook(self, httpbin): |
| 1200 | def hook(resp, **kwargs): |
| 1201 | resp.hook_working = True |
| 1202 | return resp |
| 1203 | |
| 1204 | req = requests.Request("GET", httpbin(), hooks={"response": hook}) |
| 1205 | prep = req.prepare() |
| 1206 | |
| 1207 | s = requests.Session() |
| 1208 | s.proxies = getproxies() |
| 1209 | resp = s.send(prep) |
| 1210 | |
| 1211 | assert hasattr(resp, "hook_working") |
| 1212 | |
| 1213 | def test_prepared_from_session(self, httpbin): |
| 1214 | class DummyAuth(requests.auth.AuthBase): |
| 1215 | def __call__(self, r): |
| 1216 | r.headers["Dummy-Auth-Test"] = "dummy-auth-test-ok" |
| 1217 | return r |
| 1218 | |
| 1219 | req = requests.Request("GET", httpbin("headers")) |
| 1220 | assert not req.auth |
| 1221 | |
| 1222 | s = requests.Session() |
| 1223 | s.auth = DummyAuth() |
| 1224 | |
| 1225 | prep = s.prepare_request(req) |
| 1226 | resp = s.send(prep) |
| 1227 | |
| 1228 | assert resp.json()["headers"]["Dummy-Auth-Test"] == "dummy-auth-test-ok" |
| 1229 | |
| 1230 | def test_prepare_request_with_bytestring_url(self): |
| 1231 | req = requests.Request("GET", b"https://httpbin.org/") |
| 1232 | s = requests.Session() |
| 1233 | prep = s.prepare_request(req) |
| 1234 | assert prep.url == "https://httpbin.org/" |
| 1235 | |
| 1236 | def test_request_with_bytestring_host(self, httpbin): |
| 1237 | s = requests.Session() |
| 1238 | resp = s.request( |
| 1239 | "GET", |
| 1240 | httpbin("cookies/set?cookie=value"), |
| 1241 | allow_redirects=False, |
| 1242 | headers={"Host": b"httpbin.org"}, |
| 1243 | ) |
| 1244 | assert resp.cookies.get("cookie") == "value" |
| 1245 | |
| 1246 | def test_links(self): |
| 1247 | r = requests.Response() |
| 1248 | r.headers = { |
| 1249 | "cache-control": "public, max-age=60, s-maxage=60", |
| 1250 | "connection": "keep-alive", |
| 1251 | "content-encoding": "gzip", |
| 1252 | "content-type": "application/json; charset=utf-8", |
| 1253 | "date": "Sat, 26 Jan 2013 16:47:56 GMT", |
| 1254 | "etag": '"6ff6a73c0e446c1f61614769e3ceb778"', |
| 1255 | "last-modified": "Sat, 26 Jan 2013 16:22:39 GMT", |
| 1256 | "link": ( |
| 1257 | "<https://api.github.com/users/kennethreitz/repos?" |
| 1258 | 'page=2&per_page=10>; rel="next", <https://api.github.' |
| 1259 | "com/users/kennethreitz/repos?page=7&per_page=10>; " |
| 1260 | ' rel="last"' |
| 1261 | ), |
| 1262 | "server": "GitHub.com", |
| 1263 | "status": "200 OK", |
| 1264 | "vary": "Accept", |
| 1265 | "x-content-type-options": "nosniff", |
| 1266 | "x-github-media-type": "github.beta", |
| 1267 | "x-ratelimit-limit": "60", |
| 1268 | "x-ratelimit-remaining": "57", |
| 1269 | } |
| 1270 | assert r.links["next"]["rel"] == "next" |
| 1271 | |
| 1272 | def test_cookie_parameters(self): |
| 1273 | key = "some_cookie" |
| 1274 | value = "some_value" |
| 1275 | secure = True |
| 1276 | domain = "test.com" |
| 1277 | rest = {"HttpOnly": True} |
| 1278 | |
| 1279 | jar = requests.cookies.RequestsCookieJar() |
| 1280 | jar.set(key, value, secure=secure, domain=domain, rest=rest) |
| 1281 | |
| 1282 | assert len(jar) == 1 |
| 1283 | assert "some_cookie" in jar |
| 1284 | |
| 1285 | cookie = list(jar)[0] |
| 1286 | assert cookie.secure == secure |
| 1287 | assert cookie.domain == domain |
| 1288 | assert cookie._rest["HttpOnly"] == rest["HttpOnly"] |
| 1289 | |
| 1290 | def test_cookie_as_dict_keeps_len(self): |
| 1291 | key = "some_cookie" |
| 1292 | value = "some_value" |
| 1293 | |
| 1294 | key1 = "some_cookie1" |
| 1295 | value1 = "some_value1" |
| 1296 | |
| 1297 | jar = requests.cookies.RequestsCookieJar() |
| 1298 | jar.set(key, value) |
| 1299 | jar.set(key1, value1) |
| 1300 | |
| 1301 | d1 = dict(jar) |
| 1302 | d2 = dict(jar.iteritems()) |
| 1303 | d3 = dict(jar.items()) |
| 1304 | |
| 1305 | assert len(jar) == 2 |
| 1306 | assert len(d1) == 2 |
| 1307 | assert len(d2) == 2 |
| 1308 | assert len(d3) == 2 |
| 1309 | |
| 1310 | def test_cookie_as_dict_keeps_items(self): |
| 1311 | key = "some_cookie" |
| 1312 | value = "some_value" |
| 1313 | |
| 1314 | key1 = "some_cookie1" |
| 1315 | value1 = "some_value1" |
| 1316 | |
| 1317 | jar = requests.cookies.RequestsCookieJar() |
| 1318 | jar.set(key, value) |
| 1319 | jar.set(key1, value1) |
| 1320 | |
| 1321 | d1 = dict(jar) |
| 1322 | d2 = dict(jar.iteritems()) |
| 1323 | d3 = dict(jar.items()) |
| 1324 | |
| 1325 | assert d1["some_cookie"] == "some_value" |
| 1326 | assert d2["some_cookie"] == "some_value" |
| 1327 | assert d3["some_cookie1"] == "some_value1" |
| 1328 | |
| 1329 | def test_cookie_as_dict_keys(self): |
| 1330 | key = "some_cookie" |
| 1331 | value = "some_value" |
| 1332 | |
| 1333 | key1 = "some_cookie1" |
| 1334 | value1 = "some_value1" |
| 1335 | |
| 1336 | jar = requests.cookies.RequestsCookieJar() |
| 1337 | jar.set(key, value) |
| 1338 | jar.set(key1, value1) |
| 1339 | |
| 1340 | keys = jar.keys() |
| 1341 | assert keys == list(keys) |
| 1342 | # make sure one can use keys multiple times |
| 1343 | assert list(keys) == list(keys) |
| 1344 | |
| 1345 | def test_cookie_as_dict_values(self): |
| 1346 | key = "some_cookie" |
| 1347 | value = "some_value" |
| 1348 | |
| 1349 | key1 = "some_cookie1" |
| 1350 | value1 = "some_value1" |
| 1351 | |
| 1352 | jar = requests.cookies.RequestsCookieJar() |
| 1353 | jar.set(key, value) |
| 1354 | jar.set(key1, value1) |
| 1355 | |
| 1356 | values = jar.values() |
| 1357 | assert values == list(values) |
| 1358 | # make sure one can use values multiple times |
| 1359 | assert list(values) == list(values) |
| 1360 | |
| 1361 | def test_cookie_as_dict_items(self): |
| 1362 | key = "some_cookie" |
| 1363 | value = "some_value" |
| 1364 | |
| 1365 | key1 = "some_cookie1" |
| 1366 | value1 = "some_value1" |
| 1367 | |
| 1368 | jar = requests.cookies.RequestsCookieJar() |
| 1369 | jar.set(key, value) |
| 1370 | jar.set(key1, value1) |
| 1371 | |
| 1372 | items = jar.items() |
| 1373 | assert items == list(items) |
| 1374 | # make sure one can use items multiple times |
| 1375 | assert list(items) == list(items) |
| 1376 | |
| 1377 | def test_cookie_duplicate_names_different_domains(self): |
| 1378 | key = "some_cookie" |
| 1379 | value = "some_value" |
| 1380 | domain1 = "test1.com" |
| 1381 | domain2 = "test2.com" |
| 1382 | |
| 1383 | jar = requests.cookies.RequestsCookieJar() |
| 1384 | jar.set(key, value, domain=domain1) |
| 1385 | jar.set(key, value, domain=domain2) |
| 1386 | assert key in jar |
| 1387 | items = jar.items() |
| 1388 | assert len(items) == 2 |
| 1389 | |
| 1390 | # Verify that CookieConflictError is raised if domain is not specified |
| 1391 | with pytest.raises(requests.cookies.CookieConflictError): |
| 1392 | jar.get(key) |
| 1393 | |
| 1394 | # Verify that CookieConflictError is not raised if domain is specified |
| 1395 | cookie = jar.get(key, domain=domain1) |
| 1396 | assert cookie == value |
| 1397 | |
| 1398 | def test_cookie_duplicate_names_raises_cookie_conflict_error(self): |
| 1399 | key = "some_cookie" |
| 1400 | value = "some_value" |
| 1401 | path = "some_path" |
| 1402 | |
| 1403 | jar = requests.cookies.RequestsCookieJar() |
| 1404 | jar.set(key, value, path=path) |
| 1405 | jar.set(key, value) |
| 1406 | with pytest.raises(requests.cookies.CookieConflictError): |
| 1407 | jar.get(key) |
| 1408 | |
| 1409 | def test_cookie_policy_copy(self): |
| 1410 | class MyCookiePolicy(cookielib.DefaultCookiePolicy): |
| 1411 | pass |
| 1412 | |
| 1413 | jar = requests.cookies.RequestsCookieJar() |
| 1414 | jar.set_policy(MyCookiePolicy()) |
| 1415 | assert isinstance(jar.copy().get_policy(), MyCookiePolicy) |
| 1416 | |
| 1417 | def test_time_elapsed_blank(self, httpbin): |
| 1418 | r = requests.get(httpbin("get")) |
| 1419 | td = r.elapsed |
| 1420 | total_seconds = ( |
| 1421 | td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6 |
| 1422 | ) / 10**6 |
| 1423 | assert total_seconds > 0.0 |
| 1424 | |
| 1425 | def test_empty_response_has_content_none(self): |
| 1426 | r = requests.Response() |
| 1427 | assert r.content is None |
| 1428 | |
| 1429 | def test_response_is_iterable(self): |
| 1430 | r = requests.Response() |
| 1431 | io = StringIO.StringIO("abc") |
| 1432 | read_ = io.read |
| 1433 | |
| 1434 | def read_mock(amt, decode_content=None): |
| 1435 | return read_(amt) |
| 1436 | |
| 1437 | setattr(io, "read", read_mock) |
| 1438 | r.raw = io |
| 1439 | assert next(iter(r)) |
| 1440 | io.close() |
| 1441 | |
| 1442 | def test_response_decode_unicode(self): |
| 1443 | """When called with decode_unicode, Response.iter_content should always |
| 1444 | return unicode. |
| 1445 | """ |
| 1446 | r = requests.Response() |
| 1447 | r._content_consumed = True |
| 1448 | r._content = b"the content" |
| 1449 | r.encoding = "ascii" |
| 1450 | |
| 1451 | chunks = r.iter_content(decode_unicode=True) |
| 1452 | assert all(isinstance(chunk, str) for chunk in chunks) |
| 1453 | |
| 1454 | # also for streaming |
| 1455 | r = requests.Response() |
| 1456 | r.raw = io.BytesIO(b"the content") |
| 1457 | r.encoding = "ascii" |
| 1458 | chunks = r.iter_content(decode_unicode=True) |
| 1459 | assert all(isinstance(chunk, str) for chunk in chunks) |
| 1460 | |
| 1461 | def test_response_reason_unicode(self): |
| 1462 | # check for unicode HTTP status |
| 1463 | r = requests.Response() |
| 1464 | r.url = "unicode URL" |
| 1465 | r.reason = "Komponenttia ei löydy".encode() |
| 1466 | r.status_code = 404 |
| 1467 | r.encoding = None |
| 1468 | assert not r.ok # old behaviour - crashes here |
| 1469 | |
| 1470 | def test_response_reason_unicode_fallback(self): |
| 1471 | # check raise_status falls back to ISO-8859-1 |
| 1472 | r = requests.Response() |
| 1473 | r.url = "some url" |
| 1474 | reason = "Komponenttia ei löydy" |
| 1475 | r.reason = reason.encode("latin-1") |
| 1476 | r.status_code = 500 |
| 1477 | r.encoding = None |
| 1478 | with pytest.raises(requests.exceptions.HTTPError) as e: |
| 1479 | r.raise_for_status() |
| 1480 | assert reason in e.value.args[0] |
| 1481 | |
| 1482 | def test_response_chunk_size_type(self): |
| 1483 | """Ensure that chunk_size is passed as None or an integer, otherwise |
| 1484 | raise a TypeError. |
| 1485 | """ |
| 1486 | r = requests.Response() |
| 1487 | r.raw = io.BytesIO(b"the content") |
| 1488 | chunks = r.iter_content(1) |
| 1489 | assert all(len(chunk) == 1 for chunk in chunks) |
| 1490 | |
| 1491 | r = requests.Response() |
| 1492 | r.raw = io.BytesIO(b"the content") |
| 1493 | chunks = r.iter_content(None) |
| 1494 | assert list(chunks) == [b"the content"] |
| 1495 | |
| 1496 | r = requests.Response() |
| 1497 | r.raw = io.BytesIO(b"the content") |
| 1498 | with pytest.raises(TypeError): |
| 1499 | chunks = r.iter_content("1024") |
| 1500 | |
| 1501 | @pytest.mark.parametrize( |
| 1502 | "exception, args, expected", |
| 1503 | ( |
| 1504 | (urllib3.exceptions.ProtocolError, tuple(), ChunkedEncodingError), |
| 1505 | (urllib3.exceptions.DecodeError, tuple(), ContentDecodingError), |
| 1506 | (urllib3.exceptions.ReadTimeoutError, (None, "", ""), ConnectionError), |
| 1507 | (urllib3.exceptions.SSLError, tuple(), RequestsSSLError), |
| 1508 | ), |
| 1509 | ) |
| 1510 | def test_iter_content_wraps_exceptions(self, httpbin, exception, args, expected): |
| 1511 | r = requests.Response() |
| 1512 | r.raw = mock.Mock() |
| 1513 | # ReadTimeoutError can't be initialized by mock |
| 1514 | # so we'll manually create the instance with args |
| 1515 | r.raw.stream.side_effect = exception(*args) |
| 1516 | |
| 1517 | with pytest.raises(expected): |
| 1518 | next(r.iter_content(1024)) |
| 1519 | |
| 1520 | def test_request_and_response_are_pickleable(self, httpbin): |
| 1521 | r = requests.get(httpbin("get")) |
| 1522 | |
| 1523 | # verify we can pickle the original request |
| 1524 | assert pickle.loads(pickle.dumps(r.request)) |
| 1525 | |
| 1526 | # verify we can pickle the response and that we have access to |
| 1527 | # the original request. |
| 1528 | pr = pickle.loads(pickle.dumps(r)) |
| 1529 | assert r.request.url == pr.request.url |
| 1530 | assert r.request.headers == pr.request.headers |
| 1531 | |
| 1532 | def test_prepared_request_is_pickleable(self, httpbin): |
| 1533 | p = requests.Request("GET", httpbin("get")).prepare() |
| 1534 | |
| 1535 | # Verify PreparedRequest can be pickled and unpickled |
| 1536 | r = pickle.loads(pickle.dumps(p)) |
| 1537 | assert r.url == p.url |
| 1538 | assert r.headers == p.headers |
| 1539 | assert r.body == p.body |
| 1540 | |
| 1541 | # Verify unpickled PreparedRequest sends properly |
| 1542 | s = requests.Session() |
| 1543 | resp = s.send(r) |
| 1544 | assert resp.status_code == 200 |
| 1545 | |
| 1546 | def test_prepared_request_with_file_is_pickleable(self, httpbin): |
| 1547 | with open(__file__, "rb") as f: |
| 1548 | r = requests.Request("POST", httpbin("post"), files={"file": f}) |
| 1549 | p = r.prepare() |
| 1550 | |
| 1551 | # Verify PreparedRequest can be pickled and unpickled |
| 1552 | r = pickle.loads(pickle.dumps(p)) |
| 1553 | assert r.url == p.url |
| 1554 | assert r.headers == p.headers |
| 1555 | assert r.body == p.body |
| 1556 | |
| 1557 | # Verify unpickled PreparedRequest sends properly |
| 1558 | s = requests.Session() |
| 1559 | resp = s.send(r) |
| 1560 | assert resp.status_code == 200 |
| 1561 | |
| 1562 | def test_prepared_request_with_hook_is_pickleable(self, httpbin): |
| 1563 | r = requests.Request("GET", httpbin("get"), hooks=default_hooks()) |
| 1564 | p = r.prepare() |
| 1565 | |
| 1566 | # Verify PreparedRequest can be pickled |
| 1567 | r = pickle.loads(pickle.dumps(p)) |
| 1568 | assert r.url == p.url |
| 1569 | assert r.headers == p.headers |
| 1570 | assert r.body == p.body |
| 1571 | assert r.hooks == p.hooks |
| 1572 | |
| 1573 | # Verify unpickled PreparedRequest sends properly |
| 1574 | s = requests.Session() |
| 1575 | resp = s.send(r) |
| 1576 | assert resp.status_code == 200 |
| 1577 | |
| 1578 | def test_cannot_send_unprepared_requests(self, httpbin): |
| 1579 | r = requests.Request(url=httpbin()) |
| 1580 | with pytest.raises(ValueError): |
| 1581 | requests.Session().send(r) |
| 1582 | |
| 1583 | def test_http_error(self): |
| 1584 | error = requests.exceptions.HTTPError() |
| 1585 | assert not error.response |
| 1586 | response = requests.Response() |
| 1587 | error = requests.exceptions.HTTPError(response=response) |
| 1588 | assert error.response == response |
| 1589 | error = requests.exceptions.HTTPError("message", response=response) |
| 1590 | assert str(error) == "message" |
| 1591 | assert error.response == response |
| 1592 | |
| 1593 | def test_session_pickling(self, httpbin): |
| 1594 | r = requests.Request("GET", httpbin("get")) |
| 1595 | s = requests.Session() |
| 1596 | |
| 1597 | s = pickle.loads(pickle.dumps(s)) |
| 1598 | s.proxies = getproxies() |
| 1599 | |
| 1600 | r = s.send(r.prepare()) |
| 1601 | assert r.status_code == 200 |
| 1602 | |
| 1603 | def test_fixes_1329(self, httpbin): |
| 1604 | """Ensure that header updates are done case-insensitively.""" |
| 1605 | s = requests.Session() |
| 1606 | s.headers.update({"ACCEPT": "BOGUS"}) |
| 1607 | s.headers.update({"accept": "application/json"}) |
| 1608 | r = s.get(httpbin("get")) |
| 1609 | headers = r.request.headers |
| 1610 | assert headers["accept"] == "application/json" |
| 1611 | assert headers["Accept"] == "application/json" |
| 1612 | assert headers["ACCEPT"] == "application/json" |
| 1613 | |
| 1614 | def test_uppercase_scheme_redirect(self, httpbin): |
| 1615 | parts = urlparse(httpbin("html")) |
| 1616 | url = "HTTP://" + parts.netloc + parts.path |
| 1617 | r = requests.get(httpbin("redirect-to"), params={"url": url}) |
| 1618 | assert r.status_code == 200 |
| 1619 | assert r.url.lower() == url.lower() |
| 1620 | |
| 1621 | def test_transport_adapter_ordering(self): |
| 1622 | s = requests.Session() |
| 1623 | order = ["https://", "http://"] |
| 1624 | assert order == list(s.adapters) |
| 1625 | s.mount("http://git", HTTPAdapter()) |
| 1626 | s.mount("http://github", HTTPAdapter()) |
| 1627 | s.mount("http://github.com", HTTPAdapter()) |
| 1628 | s.mount("http://github.com/about/", HTTPAdapter()) |
| 1629 | order = [ |
| 1630 | "http://github.com/about/", |
| 1631 | "http://github.com", |
| 1632 | "http://github", |
| 1633 | "http://git", |
| 1634 | "https://", |
| 1635 | "http://", |
| 1636 | ] |
| 1637 | assert order == list(s.adapters) |
| 1638 | s.mount("http://gittip", HTTPAdapter()) |
| 1639 | s.mount("http://gittip.com", HTTPAdapter()) |
| 1640 | s.mount("http://gittip.com/about/", HTTPAdapter()) |
| 1641 | order = [ |
| 1642 | "http://github.com/about/", |
| 1643 | "http://gittip.com/about/", |
| 1644 | "http://github.com", |
| 1645 | "http://gittip.com", |
| 1646 | "http://github", |
| 1647 | "http://gittip", |
| 1648 | "http://git", |
| 1649 | "https://", |
| 1650 | "http://", |
| 1651 | ] |
| 1652 | assert order == list(s.adapters) |
| 1653 | s2 = requests.Session() |
| 1654 | s2.adapters = {"http://": HTTPAdapter()} |
| 1655 | s2.mount("https://", HTTPAdapter()) |
| 1656 | assert "http://" in s2.adapters |
| 1657 | assert "https://" in s2.adapters |
| 1658 | |
| 1659 | def test_session_get_adapter_prefix_matching(self): |
| 1660 | prefix = "https://example.com" |
| 1661 | more_specific_prefix = prefix + "/some/path" |
| 1662 | |
| 1663 | url_matching_only_prefix = prefix + "/another/path" |
| 1664 | url_matching_more_specific_prefix = more_specific_prefix + "/longer/path" |
| 1665 | url_not_matching_prefix = "https://another.example.com/" |
| 1666 | |
| 1667 | s = requests.Session() |
| 1668 | prefix_adapter = HTTPAdapter() |
| 1669 | more_specific_prefix_adapter = HTTPAdapter() |
| 1670 | s.mount(prefix, prefix_adapter) |
| 1671 | s.mount(more_specific_prefix, more_specific_prefix_adapter) |
| 1672 | |
| 1673 | assert s.get_adapter(url_matching_only_prefix) is prefix_adapter |
| 1674 | assert ( |
| 1675 | s.get_adapter(url_matching_more_specific_prefix) |
| 1676 | is more_specific_prefix_adapter |
| 1677 | ) |
| 1678 | assert s.get_adapter(url_not_matching_prefix) not in ( |
| 1679 | prefix_adapter, |
| 1680 | more_specific_prefix_adapter, |
| 1681 | ) |
| 1682 | |
| 1683 | def test_session_get_adapter_prefix_matching_mixed_case(self): |
| 1684 | mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix" |
| 1685 | url_matching_prefix = mixed_case_prefix + "/full_url" |
| 1686 | |
| 1687 | s = requests.Session() |
| 1688 | my_adapter = HTTPAdapter() |
| 1689 | s.mount(mixed_case_prefix, my_adapter) |
| 1690 | |
| 1691 | assert s.get_adapter(url_matching_prefix) is my_adapter |
| 1692 | |
| 1693 | def test_session_get_adapter_prefix_matching_is_case_insensitive(self): |
| 1694 | mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix" |
| 1695 | url_matching_prefix_with_different_case = ( |
| 1696 | "HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url" |
| 1697 | ) |
| 1698 | |
| 1699 | s = requests.Session() |
| 1700 | my_adapter = HTTPAdapter() |
| 1701 | s.mount(mixed_case_prefix, my_adapter) |
| 1702 | |
| 1703 | assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter |
| 1704 | |
| 1705 | def test_session_get_adapter_prefix_with_trailing_slash(self): |
| 1706 | # from issue #6935 |
| 1707 | prefix = "https://example.com/" # trailing slash |
| 1708 | url_matching_prefix = "https://example.com/some/path" |
| 1709 | url_not_matching_prefix = "https://example.com.other.com/some/path" |
| 1710 | |
| 1711 | s = requests.Session() |
| 1712 | adapter = HTTPAdapter() |
| 1713 | s.mount(prefix, adapter) |
| 1714 | |
| 1715 | assert s.get_adapter(url_matching_prefix) is adapter |
| 1716 | assert s.get_adapter(url_not_matching_prefix) is not adapter |
| 1717 | |
| 1718 | def test_session_get_adapter_prefix_without_trailing_slash(self): |
| 1719 | # from issue #6935 |
| 1720 | prefix = "https://example.com" # no trailing slash |
| 1721 | url_matching_prefix = "https://example.com/some/path" |
| 1722 | url_extended_hostname = "https://example.com.other.com/some/path" |
| 1723 | |
| 1724 | s = requests.Session() |
| 1725 | adapter = HTTPAdapter() |
| 1726 | s.mount(prefix, adapter) |
| 1727 | |
| 1728 | assert s.get_adapter(url_matching_prefix) is adapter |
| 1729 | assert s.get_adapter(url_extended_hostname) is adapter |
| 1730 | |
| 1731 | def test_header_remove_is_case_insensitive(self, httpbin): |
| 1732 | # From issue #1321 |
| 1733 | s = requests.Session() |
| 1734 | s.headers["foo"] = "bar" |
| 1735 | r = s.get(httpbin("get"), headers={"FOO": None}) |
| 1736 | assert "foo" not in r.request.headers |
| 1737 | |
| 1738 | def test_params_are_merged_case_sensitive(self, httpbin): |
| 1739 | s = requests.Session() |
| 1740 | s.params["foo"] = "bar" |
| 1741 | r = s.get(httpbin("get"), params={"FOO": "bar"}) |
| 1742 | assert r.json()["args"] == {"foo": "bar", "FOO": "bar"} |
| 1743 | |
| 1744 | def test_long_authinfo_in_url(self): |
| 1745 | url = "http://{}:{}@{}:9000/path?query#frag".format( |
| 1746 | "E8A3BE87-9E3F-4620-8858-95478E385B5B", |
| 1747 | "EA770032-DA4D-4D84-8CE9-29C6D910BF1E", |
| 1748 | "exactly-------------sixty-----------three------------characters", |
| 1749 | ) |
| 1750 | r = requests.Request("GET", url).prepare() |
| 1751 | assert r.url == url |
| 1752 | |
| 1753 | def test_header_keys_are_native(self, httpbin): |
| 1754 | headers = {"unicode": "blah", b"byte": "blah"} |
| 1755 | r = requests.Request("GET", httpbin("get"), headers=headers) |
| 1756 | p = r.prepare() |
| 1757 | |
| 1758 | # This is testing that they are builtin strings. A bit weird, but there |
| 1759 | # we go. |
| 1760 | assert "unicode" in p.headers.keys() |
| 1761 | assert "byte" in p.headers.keys() |
| 1762 | |
| 1763 | def test_header_validation(self, httpbin): |
| 1764 | """Ensure prepare_headers regex isn't flagging valid header contents.""" |
| 1765 | valid_headers = { |
| 1766 | "foo": "bar baz qux", |
| 1767 | "bar": b"fbbq", |
| 1768 | "baz": "", |
| 1769 | "qux": "1", |
| 1770 | } |
| 1771 | r = requests.get(httpbin("get"), headers=valid_headers) |
| 1772 | for key in valid_headers.keys(): |
| 1773 | assert valid_headers[key] == r.request.headers[key] |
| 1774 | |
| 1775 | @pytest.mark.parametrize( |
| 1776 | "invalid_header, key", |
| 1777 | ( |
| 1778 | ({"foo": 3}, "foo"), |
| 1779 | ({"bar": {"foo": "bar"}}, "bar"), |
| 1780 | ({"baz": ["foo", "bar"]}, "baz"), |
| 1781 | ), |
| 1782 | ) |
| 1783 | def test_header_value_not_str(self, httpbin, invalid_header, key): |
| 1784 | """Ensure the header value is of type string or bytes as |
| 1785 | per discussion in GH issue #3386 |
| 1786 | """ |
| 1787 | with pytest.raises(InvalidHeader) as excinfo: |
| 1788 | requests.get(httpbin("get"), headers=invalid_header) |
| 1789 | assert key in str(excinfo.value) |
| 1790 | |
| 1791 | @pytest.mark.parametrize( |
| 1792 | "invalid_header", |
| 1793 | ( |
| 1794 | {"foo": "bar\r\nbaz: qux"}, |
| 1795 | {"foo": "bar\n\rbaz: qux"}, |
| 1796 | {"foo": "bar\nbaz: qux"}, |
| 1797 | {"foo": "bar\rbaz: qux"}, |
| 1798 | {"fo\ro": "bar"}, |
| 1799 | {"fo\r\no": "bar"}, |
| 1800 | {"fo\n\ro": "bar"}, |
| 1801 | {"fo\no": "bar"}, |
| 1802 | {"foo": "bar\n"}, |
| 1803 | {"foo\n": "bar"}, |
| 1804 | {"foo": "bar\r\n"}, |
| 1805 | {"foo": "\n"}, |
| 1806 | {"foo": "\r\n"}, |
| 1807 | ), |
| 1808 | ) |
| 1809 | def test_header_no_return_chars(self, httpbin, invalid_header): |
| 1810 | """Ensure that a header containing return character sequences raise an |
| 1811 | exception. Otherwise, multiple headers are created from single string. |
| 1812 | """ |
| 1813 | with pytest.raises(InvalidHeader): |
| 1814 | requests.get(httpbin("get"), headers=invalid_header) |
| 1815 | |
| 1816 | @pytest.mark.parametrize( |
| 1817 | "invalid_header", |
| 1818 | ( |
| 1819 | {" foo": "bar"}, |
| 1820 | {"\tfoo": "bar"}, |
| 1821 | {" foo": "bar"}, |
| 1822 | {"foo": " bar"}, |
| 1823 | {"foo": " bar"}, |
| 1824 | {"foo": "\tbar"}, |
| 1825 | {" ": "bar"}, |
| 1826 | ), |
| 1827 | ) |
| 1828 | def test_header_no_leading_space(self, httpbin, invalid_header): |
| 1829 | """Ensure headers containing leading whitespace raise |
| 1830 | InvalidHeader Error before sending. |
| 1831 | """ |
| 1832 | with pytest.raises(InvalidHeader): |
| 1833 | requests.get(httpbin("get"), headers=invalid_header) |
| 1834 | |
| 1835 | def test_header_with_subclass_types(self, httpbin): |
| 1836 | """If the subclasses does not behave *exactly* like |
| 1837 | the base bytes/str classes, this is not supported. |
| 1838 | This test is for backwards compatibility. |
| 1839 | """ |
| 1840 | |
| 1841 | class MyString(str): |
| 1842 | pass |
| 1843 | |
| 1844 | class MyBytes(bytes): |
| 1845 | pass |
| 1846 | |
| 1847 | r_str = requests.get(httpbin("get"), headers={MyString("x-custom"): "myheader"}) |
| 1848 | assert r_str.request.headers["x-custom"] == "myheader" |
| 1849 | |
| 1850 | r_bytes = requests.get( |
| 1851 | httpbin("get"), headers={MyBytes(b"x-custom"): b"myheader"} |
| 1852 | ) |
| 1853 | assert r_bytes.request.headers["x-custom"] == b"myheader" |
| 1854 | |
| 1855 | r_mixed = requests.get( |
| 1856 | httpbin("get"), headers={MyString("x-custom"): MyBytes(b"myheader")} |
| 1857 | ) |
| 1858 | assert r_mixed.request.headers["x-custom"] == b"myheader" |
| 1859 | |
| 1860 | @pytest.mark.parametrize("files", ("foo", b"foo", bytearray(b"foo"))) |
| 1861 | def test_can_send_objects_with_files(self, httpbin, files): |
| 1862 | data = {"a": "this is a string"} |
| 1863 | files = {"b": files} |
| 1864 | r = requests.Request("POST", httpbin("post"), data=data, files=files) |
| 1865 | p = r.prepare() |
| 1866 | assert "multipart/form-data" in p.headers["Content-Type"] |
| 1867 | |
| 1868 | def test_can_send_file_object_with_non_string_filename(self, httpbin): |
| 1869 | f = io.BytesIO() |
| 1870 | f.name = 2 |
| 1871 | r = requests.Request("POST", httpbin("post"), files={"f": f}) |
| 1872 | p = r.prepare() |
| 1873 | |
| 1874 | assert "multipart/form-data" in p.headers["Content-Type"] |
| 1875 | |
| 1876 | def test_autoset_header_values_are_native(self, httpbin): |
| 1877 | data = "this is a string" |
| 1878 | length = "16" |
| 1879 | req = requests.Request("POST", httpbin("post"), data=data) |
| 1880 | p = req.prepare() |
| 1881 | |
| 1882 | assert p.headers["Content-Length"] == length |
| 1883 | |
| 1884 | def test_nonhttp_schemes_dont_check_URLs(self): |
| 1885 | test_urls = ( |
| 1886 | "data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==", |
| 1887 | "file:///etc/passwd", |
| 1888 | "magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431", |
| 1889 | ) |
| 1890 | for test_url in test_urls: |
| 1891 | req = requests.Request("GET", test_url) |
| 1892 | preq = req.prepare() |
| 1893 | assert test_url == preq.url |
| 1894 | |
| 1895 | def test_auth_is_stripped_on_http_downgrade( |
| 1896 | self, httpbin, httpbin_secure, httpbin_ca_bundle |
| 1897 | ): |
| 1898 | r = requests.get( |
| 1899 | httpbin_secure("redirect-to"), |
| 1900 | params={"url": httpbin("get")}, |
| 1901 | auth=("user", "pass"), |
| 1902 | verify=httpbin_ca_bundle, |
| 1903 | ) |
| 1904 | assert r.history[0].request.headers["Authorization"] |
| 1905 | assert "Authorization" not in r.request.headers |
| 1906 | |
| 1907 | def test_auth_is_retained_for_redirect_on_host(self, httpbin): |
| 1908 | r = requests.get(httpbin("redirect/1"), auth=("user", "pass")) |
| 1909 | h1 = r.history[0].request.headers["Authorization"] |
| 1910 | h2 = r.request.headers["Authorization"] |
| 1911 | |
| 1912 | assert h1 == h2 |
| 1913 | |
| 1914 | def test_should_strip_auth_host_change(self): |
| 1915 | s = requests.Session() |
| 1916 | assert s.should_strip_auth( |
| 1917 | "http://example.com/foo", "http://another.example.com/" |
| 1918 | ) |
| 1919 | |
| 1920 | def test_should_strip_auth_http_downgrade(self): |
| 1921 | s = requests.Session() |
| 1922 | assert s.should_strip_auth("https://example.com/foo", "http://example.com/bar") |
| 1923 | |
| 1924 | def test_should_strip_auth_https_upgrade(self): |
| 1925 | s = requests.Session() |
| 1926 | assert not s.should_strip_auth( |
| 1927 | "http://example.com/foo", "https://example.com/bar" |
| 1928 | ) |
| 1929 | assert not s.should_strip_auth( |
| 1930 | "http://example.com:80/foo", "https://example.com/bar" |
| 1931 | ) |
| 1932 | assert not s.should_strip_auth( |
| 1933 | "http://example.com/foo", "https://example.com:443/bar" |
| 1934 | ) |
| 1935 | # Non-standard ports should trigger stripping |
| 1936 | assert s.should_strip_auth( |
| 1937 | "http://example.com:8080/foo", "https://example.com/bar" |
| 1938 | ) |
| 1939 | assert s.should_strip_auth( |
| 1940 | "http://example.com/foo", "https://example.com:8443/bar" |
| 1941 | ) |
| 1942 | |
| 1943 | def test_should_strip_auth_port_change(self): |
| 1944 | s = requests.Session() |
| 1945 | assert s.should_strip_auth( |
| 1946 | "http://example.com:1234/foo", "https://example.com:4321/bar" |
| 1947 | ) |
| 1948 | |
| 1949 | @pytest.mark.parametrize( |
| 1950 | "old_uri, new_uri", |
| 1951 | ( |
| 1952 | ("https://example.com:443/foo", "https://example.com/bar"), |
| 1953 | ("http://example.com:80/foo", "http://example.com/bar"), |
| 1954 | ("https://example.com/foo", "https://example.com:443/bar"), |
| 1955 | ("http://example.com/foo", "http://example.com:80/bar"), |
| 1956 | ), |
| 1957 | ) |
| 1958 | def test_should_strip_auth_default_port(self, old_uri, new_uri): |
| 1959 | s = requests.Session() |
| 1960 | assert not s.should_strip_auth(old_uri, new_uri) |
| 1961 | |
| 1962 | def test_manual_redirect_with_partial_body_read(self, httpbin): |
| 1963 | s = requests.Session() |
| 1964 | r1 = s.get(httpbin("redirect/2"), allow_redirects=False, stream=True) |
| 1965 | assert r1.is_redirect |
| 1966 | rg = s.resolve_redirects(r1, r1.request, stream=True) |
| 1967 | |
| 1968 | # read only the first eight bytes of the response body, |
| 1969 | # then follow the redirect |
| 1970 | r1.iter_content(8) |
| 1971 | r2 = next(rg) |
| 1972 | assert r2.is_redirect |
| 1973 | |
| 1974 | # read all of the response via iter_content, |
| 1975 | # then follow the redirect |
| 1976 | for _ in r2.iter_content(): |
| 1977 | pass |
| 1978 | r3 = next(rg) |
| 1979 | assert not r3.is_redirect |
| 1980 | |
| 1981 | def test_prepare_body_position_non_stream(self): |
| 1982 | data = b"the data" |
| 1983 | prep = requests.Request("GET", "http://example.com", data=data).prepare() |
| 1984 | assert prep._body_position is None |
| 1985 | |
| 1986 | def test_rewind_body(self): |
| 1987 | data = io.BytesIO(b"the data") |
| 1988 | prep = requests.Request("GET", "http://example.com", data=data).prepare() |
| 1989 | assert prep._body_position == 0 |
| 1990 | assert prep.body.read() == b"the data" |
| 1991 | |
| 1992 | # the data has all been read |
| 1993 | assert prep.body.read() == b"" |
| 1994 | |
| 1995 | # rewind it back |
| 1996 | requests.utils.rewind_body(prep) |
| 1997 | assert prep.body.read() == b"the data" |
| 1998 | |
| 1999 | def test_rewind_partially_read_body(self): |
| 2000 | data = io.BytesIO(b"the data") |
| 2001 | data.read(4) # read some data |
| 2002 | prep = requests.Request("GET", "http://example.com", data=data).prepare() |
| 2003 | assert prep._body_position == 4 |
| 2004 | assert prep.body.read() == b"data" |
| 2005 | |
| 2006 | # the data has all been read |
| 2007 | assert prep.body.read() == b"" |
| 2008 | |
| 2009 | # rewind it back |
| 2010 | requests.utils.rewind_body(prep) |
| 2011 | assert prep.body.read() == b"data" |
| 2012 | |
| 2013 | def test_rewind_body_no_seek(self): |
| 2014 | class BadFileObj: |
| 2015 | def __init__(self, data): |
| 2016 | self.data = data |
| 2017 | |
| 2018 | def tell(self): |
| 2019 | return 0 |
| 2020 | |
| 2021 | def __iter__(self): |
| 2022 | return |
| 2023 | |
| 2024 | data = BadFileObj("the data") |
| 2025 | prep = requests.Request("GET", "http://example.com", data=data).prepare() |
| 2026 | assert prep._body_position == 0 |
| 2027 | |
| 2028 | with pytest.raises(UnrewindableBodyError) as e: |
| 2029 | requests.utils.rewind_body(prep) |
| 2030 | |
| 2031 | assert "Unable to rewind request body" in str(e) |
| 2032 | |
| 2033 | def test_rewind_body_failed_seek(self): |
| 2034 | class BadFileObj: |
| 2035 | def __init__(self, data): |
| 2036 | self.data = data |
| 2037 | |
| 2038 | def tell(self): |
| 2039 | return 0 |
| 2040 | |
| 2041 | def seek(self, pos, whence=0): |
| 2042 | raise OSError() |
| 2043 | |
| 2044 | def __iter__(self): |
| 2045 | return |
| 2046 | |
| 2047 | data = BadFileObj("the data") |
| 2048 | prep = requests.Request("GET", "http://example.com", data=data).prepare() |
| 2049 | assert prep._body_position == 0 |
| 2050 | |
| 2051 | with pytest.raises(UnrewindableBodyError) as e: |
| 2052 | requests.utils.rewind_body(prep) |
| 2053 | |
| 2054 | assert "error occurred when rewinding request body" in str(e) |
| 2055 | |
| 2056 | def test_rewind_body_failed_tell(self): |
| 2057 | class BadFileObj: |
| 2058 | def __init__(self, data): |
| 2059 | self.data = data |
| 2060 | |
| 2061 | def tell(self): |
| 2062 | raise OSError() |
| 2063 | |
| 2064 | def __iter__(self): |
| 2065 | return |
| 2066 | |
| 2067 | data = BadFileObj("the data") |
| 2068 | prep = requests.Request("GET", "http://example.com", data=data).prepare() |
| 2069 | assert prep._body_position is not None |
| 2070 | |
| 2071 | with pytest.raises(UnrewindableBodyError) as e: |
| 2072 | requests.utils.rewind_body(prep) |
| 2073 | |
| 2074 | assert "Unable to rewind request body" in str(e) |
| 2075 | |
| 2076 | def test_getattr_proxy_stream_follows_redirect(self, httpbin): |
| 2077 | """Ensure stream wrappers that don't implement __iter__ directly are still detected.""" |
| 2078 | |
| 2079 | class AttrProxy: |
| 2080 | def __init__(self): |
| 2081 | self._file = io.BytesIO(b"data") |
| 2082 | |
| 2083 | def __getattr__(self, name): |
| 2084 | return getattr(self._file, name) |
| 2085 | |
| 2086 | r = requests.post( |
| 2087 | httpbin("redirect-to?url=/post&status_code=307"), data=AttrProxy() |
| 2088 | ) |
| 2089 | assert r.json()["data"] == "data" |
| 2090 | |
| 2091 | def _patch_adapter_gzipped_redirect(self, session, url): |
| 2092 | adapter = session.get_adapter(url=url) |
| 2093 | org_build_response = adapter.build_response |
| 2094 | self._patched_response = False |
| 2095 | |
| 2096 | def build_response(*args, **kwargs): |
| 2097 | resp = org_build_response(*args, **kwargs) |
| 2098 | if not self._patched_response: |
| 2099 | resp.raw.headers["content-encoding"] = "gzip" |
| 2100 | self._patched_response = True |
| 2101 | return resp |
| 2102 | |
| 2103 | adapter.build_response = build_response |
| 2104 | |
| 2105 | def test_redirect_with_wrong_gzipped_header(self, httpbin): |
| 2106 | s = requests.Session() |
| 2107 | url = httpbin("redirect/1") |
| 2108 | self._patch_adapter_gzipped_redirect(s, url) |
| 2109 | s.get(url) |
| 2110 | |
| 2111 | @pytest.mark.parametrize( |
| 2112 | "username, password, auth_str", |
| 2113 | ( |
| 2114 | ("test", "test", "Basic dGVzdDp0ZXN0"), |
| 2115 | ( |
| 2116 | "имя".encode(), |
| 2117 | "пароль".encode(), |
| 2118 | "Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA==", |
| 2119 | ), |
| 2120 | ), |
| 2121 | ) |
| 2122 | def test_basic_auth_str_is_always_native(self, username, password, auth_str): |
| 2123 | s = _basic_auth_str(username, password) |
| 2124 | assert isinstance(s, builtin_str) |
| 2125 | assert s == auth_str |
| 2126 | |
| 2127 | def test_requests_history_is_saved(self, httpbin): |
| 2128 | r = requests.get(httpbin("redirect/5")) |
| 2129 | total = r.history[-1].history |
| 2130 | i = 0 |
| 2131 | for item in r.history: |
| 2132 | assert item.history == total[0:i] |
| 2133 | i += 1 |
| 2134 | |
| 2135 | def test_json_param_post_content_type_works(self, httpbin): |
| 2136 | r = requests.post(httpbin("post"), json={"life": 42}) |
| 2137 | assert r.status_code == 200 |
| 2138 | assert "application/json" in r.request.headers["Content-Type"] |
| 2139 | assert {"life": 42} == r.json()["json"] |
| 2140 | |
| 2141 | def test_json_param_post_should_not_override_data_param(self, httpbin): |
| 2142 | r = requests.Request( |
| 2143 | method="POST", |
| 2144 | url=httpbin("post"), |
| 2145 | data={"stuff": "elixr"}, |
| 2146 | json={"music": "flute"}, |
| 2147 | ) |
| 2148 | prep = r.prepare() |
| 2149 | assert "stuff=elixr" == prep.body |
| 2150 | |
| 2151 | def test_response_iter_lines(self, httpbin): |
| 2152 | r = requests.get(httpbin("stream/4"), stream=True) |
| 2153 | assert r.status_code == 200 |
| 2154 | |
| 2155 | it = r.iter_lines() |
| 2156 | next(it) |
| 2157 | assert len(list(it)) == 3 |
| 2158 | |
| 2159 | def test_response_context_manager(self, httpbin): |
| 2160 | with requests.get(httpbin("stream/4"), stream=True) as response: |
| 2161 | assert isinstance(response, requests.Response) |
| 2162 | |
| 2163 | assert response.raw.closed |
| 2164 | |
| 2165 | def test_unconsumed_session_response_closes_connection(self, httpbin): |
| 2166 | s = requests.session() |
| 2167 | |
| 2168 | with contextlib.closing(s.get(httpbin("stream/4"), stream=True)) as response: |
| 2169 | pass |
| 2170 | |
| 2171 | assert response._content_consumed is False |
| 2172 | assert response.raw.closed |
| 2173 | |
| 2174 | @pytest.mark.xfail |
| 2175 | def test_response_iter_lines_reentrant(self, httpbin): |
| 2176 | """Response.iter_lines() is not reentrant safe""" |
| 2177 | r = requests.get(httpbin("stream/4"), stream=True) |
| 2178 | assert r.status_code == 200 |
| 2179 | |
| 2180 | next(r.iter_lines()) |
| 2181 | assert len(list(r.iter_lines())) == 3 |
| 2182 | |
| 2183 | def test_session_close_proxy_clear(self): |
| 2184 | proxies = { |
| 2185 | "one": mock.Mock(), |
| 2186 | "two": mock.Mock(), |
| 2187 | } |
| 2188 | session = requests.Session() |
| 2189 | with mock.patch.dict(session.adapters["http://"].proxy_manager, proxies): |
| 2190 | session.close() |
| 2191 | proxies["one"].clear.assert_called_once_with() |
| 2192 | proxies["two"].clear.assert_called_once_with() |
| 2193 | |
| 2194 | def test_proxy_auth(self): |
| 2195 | adapter = HTTPAdapter() |
| 2196 | headers = adapter.proxy_headers("http://user:pass@httpbin.org") |
| 2197 | assert headers == {"Proxy-Authorization": "Basic dXNlcjpwYXNz"} |
| 2198 | |
| 2199 | def test_proxy_auth_empty_pass(self): |
| 2200 | adapter = HTTPAdapter() |
| 2201 | headers = adapter.proxy_headers("http://user:@httpbin.org") |
| 2202 | assert headers == {"Proxy-Authorization": "Basic dXNlcjo="} |
| 2203 | |
| 2204 | def test_response_json_when_content_is_None(self, httpbin): |
| 2205 | r = requests.get(httpbin("/status/204")) |
| 2206 | # Make sure r.content is None |
| 2207 | r.status_code = 0 |
| 2208 | r._content = False |
| 2209 | r._content_consumed = False |
| 2210 | |
| 2211 | assert r.content is None |
| 2212 | with pytest.raises(ValueError): |
| 2213 | r.json() |
| 2214 | |
| 2215 | def test_response_without_release_conn(self): |
| 2216 | """Test `close` call for non-urllib3-like raw objects. |
| 2217 | Should work when `release_conn` attr doesn't exist on `response.raw`. |
| 2218 | """ |
| 2219 | resp = requests.Response() |
| 2220 | resp.raw = StringIO.StringIO("test") |
| 2221 | assert not resp.raw.closed |
| 2222 | resp.close() |
| 2223 | assert resp.raw.closed |
| 2224 | |
| 2225 | def test_empty_stream_with_auth_does_not_set_content_length_header(self, httpbin): |
| 2226 | """Ensure that a byte stream with size 0 will not set both a Content-Length |
| 2227 | and Transfer-Encoding header. |
| 2228 | """ |
| 2229 | auth = ("user", "pass") |
| 2230 | url = httpbin("post") |
| 2231 | file_obj = io.BytesIO(b"") |
| 2232 | r = requests.Request("POST", url, auth=auth, data=file_obj) |
| 2233 | prepared_request = r.prepare() |
| 2234 | assert "Transfer-Encoding" in prepared_request.headers |
| 2235 | assert "Content-Length" not in prepared_request.headers |
| 2236 | |
| 2237 | def test_stream_with_auth_does_not_set_transfer_encoding_header(self, httpbin): |
| 2238 | """Ensure that a byte stream with size > 0 will not set both a Content-Length |
| 2239 | and Transfer-Encoding header. |
| 2240 | """ |
| 2241 | auth = ("user", "pass") |
| 2242 | url = httpbin("post") |
| 2243 | file_obj = io.BytesIO(b"test data") |
| 2244 | r = requests.Request("POST", url, auth=auth, data=file_obj) |
| 2245 | prepared_request = r.prepare() |
| 2246 | assert "Transfer-Encoding" not in prepared_request.headers |
| 2247 | assert "Content-Length" in prepared_request.headers |
| 2248 | |
| 2249 | def test_chunked_upload_does_not_set_content_length_header(self, httpbin): |
| 2250 | """Ensure that requests with a generator body stream using |
| 2251 | Transfer-Encoding: chunked, not a Content-Length header. |
| 2252 | """ |
| 2253 | data = (i for i in [b"a", b"b", b"c"]) |
| 2254 | url = httpbin("post") |
| 2255 | r = requests.Request("POST", url, data=data) |
| 2256 | prepared_request = r.prepare() |
| 2257 | assert "Transfer-Encoding" in prepared_request.headers |
| 2258 | assert "Content-Length" not in prepared_request.headers |
| 2259 | |
| 2260 | def test_custom_redirect_mixin(self, httpbin): |
| 2261 | """Tests a custom mixin to overwrite ``get_redirect_target``. |
| 2262 | |
| 2263 | Ensures a subclassed ``requests.Session`` can handle a certain type of |
| 2264 | malformed redirect responses. |
| 2265 | |
| 2266 | 1. original request receives a proper response: 302 redirect |
| 2267 | 2. following the redirect, a malformed response is given: |
| 2268 | status code = HTTP 200 |
| 2269 | location = alternate url |
| 2270 | 3. the custom session catches the edge case and follows the redirect |
| 2271 | """ |
| 2272 | url_final = httpbin("html") |
| 2273 | querystring_malformed = urlencode({"location": url_final}) |
| 2274 | url_redirect_malformed = httpbin("response-headers?%s" % querystring_malformed) |
| 2275 | querystring_redirect = urlencode({"url": url_redirect_malformed}) |
| 2276 | url_redirect = httpbin("redirect-to?%s" % querystring_redirect) |
| 2277 | urls_test = [ |
| 2278 | url_redirect, |
| 2279 | url_redirect_malformed, |
| 2280 | url_final, |
| 2281 | ] |
| 2282 | |
| 2283 | class CustomRedirectSession(requests.Session): |
| 2284 | def get_redirect_target(self, resp): |
| 2285 | # default behavior |
| 2286 | if resp.is_redirect: |
| 2287 | return resp.headers["location"] |
| 2288 | # edge case - check to see if 'location' is in headers anyways |
| 2289 | location = resp.headers.get("location") |
| 2290 | if location and (location != resp.url): |
| 2291 | return location |
| 2292 | return None |
| 2293 | |
| 2294 | session = CustomRedirectSession() |
| 2295 | r = session.get(urls_test[0]) |
| 2296 | assert len(r.history) == 2 |
| 2297 | assert r.status_code == 200 |
| 2298 | assert r.history[0].status_code == 302 |
| 2299 | assert r.history[0].is_redirect |
| 2300 | assert r.history[1].status_code == 200 |
| 2301 | assert not r.history[1].is_redirect |
| 2302 | assert r.url == urls_test[2] |
| 2303 | |
| 2304 | |
| 2305 | class TestCaseInsensitiveDict: |
| 2306 | @pytest.mark.parametrize( |
| 2307 | "cid", |
| 2308 | ( |
| 2309 | CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"}), |
| 2310 | CaseInsensitiveDict([("Foo", "foo"), ("BAr", "bar")]), |
| 2311 | CaseInsensitiveDict(FOO="foo", BAr="bar"), |
| 2312 | ), |
| 2313 | ) |
| 2314 | def test_init(self, cid): |
| 2315 | assert len(cid) == 2 |
| 2316 | assert "foo" in cid |
| 2317 | assert "bar" in cid |
| 2318 | |
| 2319 | def test_docstring_example(self): |
| 2320 | cid = CaseInsensitiveDict() |
| 2321 | cid["Accept"] = "application/json" |
| 2322 | assert cid["aCCEPT"] == "application/json" |
| 2323 | assert list(cid) == ["Accept"] |
| 2324 | |
| 2325 | def test_len(self): |
| 2326 | cid = CaseInsensitiveDict({"a": "a", "b": "b"}) |
| 2327 | cid["A"] = "a" |
| 2328 | assert len(cid) == 2 |
| 2329 | |
| 2330 | def test_getitem(self): |
| 2331 | cid = CaseInsensitiveDict({"Spam": "blueval"}) |
| 2332 | assert cid["spam"] == "blueval" |
| 2333 | assert cid["SPAM"] == "blueval" |
| 2334 | |
| 2335 | def test_fixes_649(self): |
| 2336 | """__setitem__ should behave case-insensitively.""" |
| 2337 | cid = CaseInsensitiveDict() |
| 2338 | cid["spam"] = "oneval" |
| 2339 | cid["Spam"] = "twoval" |
| 2340 | cid["sPAM"] = "redval" |
| 2341 | cid["SPAM"] = "blueval" |
| 2342 | assert cid["spam"] == "blueval" |
| 2343 | assert cid["SPAM"] == "blueval" |
| 2344 | assert list(cid.keys()) == ["SPAM"] |
| 2345 | |
| 2346 | def test_delitem(self): |
| 2347 | cid = CaseInsensitiveDict() |
| 2348 | cid["Spam"] = "someval" |
| 2349 | del cid["sPam"] |
| 2350 | assert "spam" not in cid |
| 2351 | assert len(cid) == 0 |
| 2352 | |
| 2353 | def test_contains(self): |
| 2354 | cid = CaseInsensitiveDict() |
| 2355 | cid["Spam"] = "someval" |
| 2356 | assert "Spam" in cid |
| 2357 | assert "spam" in cid |
| 2358 | assert "SPAM" in cid |
| 2359 | assert "sPam" in cid |
| 2360 | assert "notspam" not in cid |
| 2361 | |
| 2362 | def test_get(self): |
| 2363 | cid = CaseInsensitiveDict() |
| 2364 | cid["spam"] = "oneval" |
| 2365 | cid["SPAM"] = "blueval" |
| 2366 | assert cid.get("spam") == "blueval" |
| 2367 | assert cid.get("SPAM") == "blueval" |
| 2368 | assert cid.get("sPam") == "blueval" |
| 2369 | assert cid.get("notspam", "default") == "default" |
| 2370 | |
| 2371 | def test_update(self): |
| 2372 | cid = CaseInsensitiveDict() |
| 2373 | cid["spam"] = "blueval" |
| 2374 | cid.update({"sPam": "notblueval"}) |
| 2375 | assert cid["spam"] == "notblueval" |
| 2376 | cid = CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"}) |
| 2377 | cid.update({"fOO": "anotherfoo", "bAR": "anotherbar"}) |
| 2378 | assert len(cid) == 2 |
| 2379 | assert cid["foo"] == "anotherfoo" |
| 2380 | assert cid["bar"] == "anotherbar" |
| 2381 | |
| 2382 | def test_update_retains_unchanged(self): |
| 2383 | cid = CaseInsensitiveDict({"foo": "foo", "bar": "bar"}) |
| 2384 | cid.update({"foo": "newfoo"}) |
| 2385 | assert cid["bar"] == "bar" |
| 2386 | |
| 2387 | def test_iter(self): |
| 2388 | cid = CaseInsensitiveDict({"Spam": "spam", "Eggs": "eggs"}) |
| 2389 | keys = frozenset(["Spam", "Eggs"]) |
| 2390 | assert frozenset(iter(cid)) == keys |
| 2391 | |
| 2392 | def test_equality(self): |
| 2393 | cid = CaseInsensitiveDict({"SPAM": "blueval", "Eggs": "redval"}) |
| 2394 | othercid = CaseInsensitiveDict({"spam": "blueval", "eggs": "redval"}) |
| 2395 | assert cid == othercid |
| 2396 | del othercid["spam"] |
| 2397 | assert cid != othercid |
| 2398 | assert cid == {"spam": "blueval", "eggs": "redval"} |
| 2399 | assert cid != object() |
| 2400 | |
| 2401 | def test_setdefault(self): |
| 2402 | cid = CaseInsensitiveDict({"Spam": "blueval"}) |
| 2403 | assert cid.setdefault("spam", "notblueval") == "blueval" |
| 2404 | assert cid.setdefault("notspam", "notblueval") == "notblueval" |
| 2405 | |
| 2406 | def test_lower_items(self): |
| 2407 | cid = CaseInsensitiveDict( |
| 2408 | { |
| 2409 | "Accept": "application/json", |
| 2410 | "user-Agent": "requests", |
| 2411 | } |
| 2412 | ) |
| 2413 | keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) |
| 2414 | lowerkeyset = frozenset(["accept", "user-agent"]) |
| 2415 | assert keyset == lowerkeyset |
| 2416 | |
| 2417 | def test_preserve_key_case(self): |
| 2418 | cid = CaseInsensitiveDict( |
| 2419 | { |
| 2420 | "Accept": "application/json", |
| 2421 | "user-Agent": "requests", |
| 2422 | } |
| 2423 | ) |
| 2424 | keyset = frozenset(["Accept", "user-Agent"]) |
| 2425 | assert frozenset(i[0] for i in cid.items()) == keyset |
| 2426 | assert frozenset(cid.keys()) == keyset |
| 2427 | assert frozenset(cid) == keyset |
| 2428 | |
| 2429 | def test_preserve_last_key_case(self): |
| 2430 | cid = CaseInsensitiveDict( |
| 2431 | { |
| 2432 | "Accept": "application/json", |
| 2433 | "user-Agent": "requests", |
| 2434 | } |
| 2435 | ) |
| 2436 | cid.update({"ACCEPT": "application/json"}) |
| 2437 | cid["USER-AGENT"] = "requests" |
| 2438 | keyset = frozenset(["ACCEPT", "USER-AGENT"]) |
| 2439 | assert frozenset(i[0] for i in cid.items()) == keyset |
| 2440 | assert frozenset(cid.keys()) == keyset |
| 2441 | assert frozenset(cid) == keyset |
| 2442 | |
| 2443 | def test_copy(self): |
| 2444 | cid = CaseInsensitiveDict( |
| 2445 | { |
| 2446 | "Accept": "application/json", |
| 2447 | "user-Agent": "requests", |
| 2448 | } |
| 2449 | ) |
| 2450 | cid_copy = cid.copy() |
| 2451 | assert cid == cid_copy |
| 2452 | cid["changed"] = True |
| 2453 | assert cid != cid_copy |
| 2454 | |
| 2455 | |
| 2456 | class TestMorselToCookieExpires: |
| 2457 | """Tests for morsel_to_cookie when morsel contains expires.""" |
| 2458 | |
| 2459 | def test_expires_valid_str(self): |
| 2460 | """Test case where we convert expires from string time.""" |
| 2461 | |
| 2462 | morsel = Morsel() |
| 2463 | morsel["expires"] = "Thu, 01-Jan-1970 00:00:01 GMT" |
| 2464 | cookie = morsel_to_cookie(morsel) |
| 2465 | assert cookie.expires == 1 |
| 2466 | |
| 2467 | @pytest.mark.parametrize( |
| 2468 | "value, exception", |
| 2469 | ( |
| 2470 | (100, TypeError), |
| 2471 | ("woops", ValueError), |
| 2472 | ), |
| 2473 | ) |
| 2474 | def test_expires_invalid_int(self, value, exception): |
| 2475 | """Test case where an invalid type is passed for expires.""" |
| 2476 | morsel = Morsel() |
| 2477 | morsel["expires"] = value |
| 2478 | with pytest.raises(exception): |
| 2479 | morsel_to_cookie(morsel) |
| 2480 | |
| 2481 | def test_expires_none(self): |
| 2482 | """Test case where expires is None.""" |
| 2483 | |
| 2484 | morsel = Morsel() |
| 2485 | morsel["expires"] = None |
| 2486 | cookie = morsel_to_cookie(morsel) |
| 2487 | assert cookie.expires is None |
| 2488 | |
| 2489 | |
| 2490 | class TestMorselToCookieMaxAge: |
| 2491 | """Tests for morsel_to_cookie when morsel contains max-age.""" |
| 2492 | |
| 2493 | def test_max_age_valid_int(self): |
| 2494 | """Test case where a valid max age in seconds is passed.""" |
| 2495 | |
| 2496 | morsel = Morsel() |
| 2497 | morsel["max-age"] = 60 |
| 2498 | cookie = morsel_to_cookie(morsel) |
| 2499 | assert isinstance(cookie.expires, int) |
| 2500 | |
| 2501 | def test_max_age_invalid_str(self): |
| 2502 | """Test case where a invalid max age is passed.""" |
| 2503 | |
| 2504 | morsel = Morsel() |
| 2505 | morsel["max-age"] = "woops" |
| 2506 | with pytest.raises(TypeError): |
| 2507 | morsel_to_cookie(morsel) |
| 2508 | |
| 2509 | |
| 2510 | class TestTimeout: |
| 2511 | def test_stream_timeout(self, httpbin): |
| 2512 | try: |
| 2513 | requests.get(httpbin("delay/10"), timeout=2.0) |
| 2514 | except requests.exceptions.Timeout as e: |
| 2515 | assert "Read timed out" in e.args[0].args[0] |
| 2516 | |
| 2517 | @pytest.mark.parametrize( |
| 2518 | "timeout, error_text", |
| 2519 | ( |
| 2520 | ((3, 4, 5), "(connect, read)"), |
| 2521 | ("foo", "must be an int, float or None"), |
| 2522 | ), |
| 2523 | ) |
| 2524 | def test_invalid_timeout(self, httpbin, timeout, error_text): |
| 2525 | with pytest.raises(ValueError) as e: |
| 2526 | requests.get(httpbin("get"), timeout=timeout) |
| 2527 | assert error_text in str(e) |
| 2528 | |
| 2529 | @pytest.mark.parametrize("timeout", (None, Urllib3Timeout(connect=None, read=None))) |
| 2530 | def test_none_timeout(self, httpbin, timeout): |
| 2531 | """Check that you can set None as a valid timeout value. |
| 2532 | |
| 2533 | To actually test this behavior, we'd want to check that setting the |
| 2534 | timeout to None actually lets the request block past the system default |
| 2535 | timeout. However, this would make the test suite unbearably slow. |
| 2536 | Instead we verify that setting the timeout to None does not prevent the |
| 2537 | request from succeeding. |
| 2538 | """ |
| 2539 | r = requests.get(httpbin("get"), timeout=timeout) |
| 2540 | assert r.status_code == 200 |
| 2541 | |
| 2542 | @pytest.mark.parametrize( |
| 2543 | "timeout", ((None, 0.1), Urllib3Timeout(connect=None, read=0.1)) |
| 2544 | ) |
| 2545 | def test_read_timeout(self, httpbin, timeout): |
| 2546 | try: |
| 2547 | requests.get(httpbin("delay/10"), timeout=timeout) |
| 2548 | pytest.fail("The recv() request should time out.") |
| 2549 | except ReadTimeout: |
| 2550 | pass |
| 2551 | |
| 2552 | @pytest.mark.parametrize( |
| 2553 | "timeout", ((0.1, None), Urllib3Timeout(connect=0.1, read=None)) |
| 2554 | ) |
| 2555 | def test_connect_timeout(self, timeout): |
| 2556 | try: |
| 2557 | requests.get(TARPIT, timeout=timeout) |
| 2558 | pytest.fail("The connect() request should time out.") |
| 2559 | except ConnectTimeout as e: |
| 2560 | assert isinstance(e, ConnectionError) |
| 2561 | assert isinstance(e, Timeout) |
| 2562 | |
| 2563 | @pytest.mark.parametrize( |
| 2564 | "timeout", ((0.1, 0.1), Urllib3Timeout(connect=0.1, read=0.1)) |
| 2565 | ) |
| 2566 | def test_total_timeout_connect(self, timeout): |
| 2567 | try: |
| 2568 | requests.get(TARPIT, timeout=timeout) |
| 2569 | pytest.fail("The connect() request should time out.") |
| 2570 | except ConnectTimeout: |
| 2571 | pass |
| 2572 | |
| 2573 | def test_encoded_methods(self, httpbin): |
| 2574 | """See: https://github.com/psf/requests/issues/2316""" |
| 2575 | r = requests.request(b"GET", httpbin("get")) |
| 2576 | assert r.ok |
| 2577 | |
| 2578 | |
| 2579 | SendCall = collections.namedtuple("SendCall", ("args", "kwargs")) |
| 2580 | |
| 2581 | |
| 2582 | class RedirectSession(SessionRedirectMixin): |
| 2583 | def __init__(self, order_of_redirects): |
| 2584 | self.redirects = order_of_redirects |
| 2585 | self.calls = [] |
| 2586 | self.max_redirects = 30 |
| 2587 | self.cookies = {} |
| 2588 | self.trust_env = False |
| 2589 | |
| 2590 | def send(self, *args, **kwargs): |
| 2591 | self.calls.append(SendCall(args, kwargs)) |
| 2592 | return self.build_response() |
| 2593 | |
| 2594 | def build_response(self): |
| 2595 | request = self.calls[-1].args[0] |
| 2596 | r = requests.Response() |
| 2597 | r.url = request.url |
| 2598 | |
| 2599 | try: |
| 2600 | r.status_code = int(self.redirects.pop(0)) |
| 2601 | except IndexError: |
| 2602 | r.status_code = 200 |
| 2603 | |
| 2604 | r.headers = CaseInsensitiveDict({"Location": "/"}) |
| 2605 | r.raw = self._build_raw() |
| 2606 | r.request = request |
| 2607 | return r |
| 2608 | |
| 2609 | def _build_raw(self): |
| 2610 | string = StringIO.StringIO("") |
| 2611 | setattr(string, "release_conn", lambda *args: args) |
| 2612 | return string |
| 2613 | |
| 2614 | |
| 2615 | def test_json_encodes_as_bytes(): |
| 2616 | # urllib3 expects bodies as bytes-like objects |
| 2617 | body = {"key": "value"} |
| 2618 | p = PreparedRequest() |
| 2619 | p.prepare(method="GET", url="https://www.example.com/", json=body) |
| 2620 | assert isinstance(p.body, bytes) |
| 2621 | |
| 2622 | |
| 2623 | def test_requests_are_updated_each_time(httpbin): |
| 2624 | session = RedirectSession([303, 307]) |
| 2625 | prep = requests.Request("POST", httpbin("post")).prepare() |
| 2626 | r0 = session.send(prep) |
| 2627 | assert r0.request.method == "POST" |
| 2628 | assert session.calls[-1] == SendCall((r0.request,), {}) |
| 2629 | redirect_generator = session.resolve_redirects(r0, prep) |
| 2630 | default_keyword_args = { |
| 2631 | "stream": False, |
| 2632 | "verify": True, |
| 2633 | "cert": None, |
| 2634 | "timeout": None, |
| 2635 | "allow_redirects": False, |
| 2636 | "proxies": {}, |
| 2637 | } |
| 2638 | for response in redirect_generator: |
| 2639 | assert response.request.method == "GET" |
| 2640 | send_call = SendCall((response.request,), default_keyword_args) |
| 2641 | assert session.calls[-1] == send_call |
| 2642 | |
| 2643 | |
| 2644 | @pytest.mark.parametrize( |
| 2645 | "var,url,proxy", |
| 2646 | [ |
| 2647 | ("http_proxy", "http://example.com", "socks5://proxy.com:9876"), |
| 2648 | ("https_proxy", "https://example.com", "socks5://proxy.com:9876"), |
| 2649 | ("all_proxy", "http://example.com", "socks5://proxy.com:9876"), |
| 2650 | ("all_proxy", "https://example.com", "socks5://proxy.com:9876"), |
| 2651 | ], |
| 2652 | ) |
| 2653 | def test_proxy_env_vars_override_default(var, url, proxy): |
| 2654 | session = requests.Session() |
| 2655 | prep = PreparedRequest() |
| 2656 | prep.prepare(method="GET", url=url) |
| 2657 | |
| 2658 | kwargs = {var: proxy} |
| 2659 | scheme = urlparse(url).scheme |
| 2660 | with override_environ(**kwargs): |
| 2661 | proxies = session.rebuild_proxies(prep, {}) |
| 2662 | assert scheme in proxies |
| 2663 | assert proxies[scheme] == proxy |
| 2664 | |
| 2665 | |
| 2666 | @pytest.mark.parametrize( |
| 2667 | "data", |
| 2668 | ( |
| 2669 | (("a", "b"), ("c", "d")), |
| 2670 | (("c", "d"), ("a", "b")), |
| 2671 | (("a", "b"), ("c", "d"), ("e", "f")), |
| 2672 | ), |
| 2673 | ) |
| 2674 | def test_data_argument_accepts_tuples(data): |
| 2675 | """Ensure that the data argument will accept tuples of strings |
| 2676 | and properly encode them. |
| 2677 | """ |
| 2678 | p = PreparedRequest() |
| 2679 | p.prepare( |
| 2680 | method="GET", url="http://www.example.com", data=data, hooks=default_hooks() |
| 2681 | ) |
| 2682 | assert p.body == urlencode(data) |
| 2683 | |
| 2684 | |
| 2685 | @pytest.mark.parametrize( |
| 2686 | "kwargs", |
| 2687 | ( |
| 2688 | None, |
| 2689 | { |
| 2690 | "method": "GET", |
| 2691 | "url": "http://www.example.com", |
| 2692 | "data": "foo=bar", |
| 2693 | "hooks": default_hooks(), |
| 2694 | }, |
| 2695 | { |
| 2696 | "method": "GET", |
| 2697 | "url": "http://www.example.com", |
| 2698 | "data": "foo=bar", |
| 2699 | "hooks": default_hooks(), |
| 2700 | "cookies": {"foo": "bar"}, |
| 2701 | }, |
| 2702 | {"method": "GET", "url": "http://www.example.com/üniçø∂é"}, |
| 2703 | ), |
| 2704 | ) |
| 2705 | def test_prepared_copy(kwargs): |
| 2706 | p = PreparedRequest() |
| 2707 | if kwargs: |
| 2708 | p.prepare(**kwargs) |
| 2709 | copy = p.copy() |
| 2710 | for attr in ("method", "url", "headers", "_cookies", "body", "hooks"): |
| 2711 | assert getattr(p, attr) == getattr(copy, attr) |
| 2712 | |
| 2713 | |
| 2714 | def test_urllib3_retries(httpbin): |
| 2715 | from urllib3.util import Retry |
| 2716 | |
| 2717 | s = requests.Session() |
| 2718 | s.mount("http://", HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500]))) |
| 2719 | |
| 2720 | with pytest.raises(RetryError): |
| 2721 | s.get(httpbin("status/500")) |
| 2722 | |
| 2723 | |
| 2724 | def test_urllib3_pool_connection_closed(httpbin): |
| 2725 | s = requests.Session() |
| 2726 | s.mount("http://", HTTPAdapter(pool_connections=0, pool_maxsize=0)) |
| 2727 | |
| 2728 | try: |
| 2729 | s.get(httpbin("status/200")) |
| 2730 | except ConnectionError as e: |
| 2731 | assert "Pool is closed." in str(e) |
| 2732 | |
| 2733 | |
| 2734 | class TestPreparingURLs: |
| 2735 | @pytest.mark.parametrize( |
| 2736 | "url,expected", |
| 2737 | ( |
| 2738 | ("http://google.com", "http://google.com/"), |
| 2739 | ("http://ジェーピーニック.jp", "http://xn--hckqz9bzb1cyrb.jp/"), |
| 2740 | ("http://xn--n3h.net/", "http://xn--n3h.net/"), |
| 2741 | ("http://ジェーピーニック.jp".encode(), "http://xn--hckqz9bzb1cyrb.jp/"), |
| 2742 | ("http://straße.de/straße", "http://xn--strae-oqa.de/stra%C3%9Fe"), |
| 2743 | ( |
| 2744 | "http://straße.de/straße".encode(), |
| 2745 | "http://xn--strae-oqa.de/stra%C3%9Fe", |
| 2746 | ), |
| 2747 | ( |
| 2748 | "http://Königsgäßchen.de/straße", |
| 2749 | "http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe", |
| 2750 | ), |
| 2751 | ( |
| 2752 | "http://Königsgäßchen.de/straße".encode(), |
| 2753 | "http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe", |
| 2754 | ), |
| 2755 | (b"http://xn--n3h.net/", "http://xn--n3h.net/"), |
| 2756 | ( |
| 2757 | b"http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", |
| 2758 | "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", |
| 2759 | ), |
| 2760 | ( |
| 2761 | "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", |
| 2762 | "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", |
| 2763 | ), |
| 2764 | ), |
| 2765 | ) |
| 2766 | def test_preparing_url(self, url, expected): |
| 2767 | def normalize_percent_encode(x): |
| 2768 | # Helper function that normalizes equivalent |
| 2769 | # percent-encoded bytes before comparisons |
| 2770 | for c in re.findall(r"%[a-fA-F0-9]{2}", x): |
| 2771 | x = x.replace(c, c.upper()) |
| 2772 | return x |
| 2773 | |
| 2774 | r = requests.Request("GET", url=url) |
| 2775 | p = r.prepare() |
| 2776 | assert normalize_percent_encode(p.url) == expected |
| 2777 | |
| 2778 | @pytest.mark.parametrize( |
| 2779 | "url", |
| 2780 | ( |
| 2781 | b"http://*.google.com", |
| 2782 | b"http://*", |
| 2783 | "http://*.google.com", |
| 2784 | "http://*", |
| 2785 | "http://☃.net/", |
| 2786 | ), |
| 2787 | ) |
| 2788 | def test_preparing_bad_url(self, url): |
| 2789 | r = requests.Request("GET", url=url) |
| 2790 | with pytest.raises(requests.exceptions.InvalidURL): |
| 2791 | r.prepare() |
| 2792 | |
| 2793 | @pytest.mark.parametrize("url, exception", (("http://:1", InvalidURL),)) |
| 2794 | def test_redirecting_to_bad_url(self, httpbin, url, exception): |
| 2795 | with pytest.raises(exception): |
| 2796 | requests.get(httpbin("redirect-to"), params={"url": url}) |
| 2797 | |
| 2798 | @pytest.mark.parametrize( |
| 2799 | "input, expected", |
| 2800 | ( |
| 2801 | ( |
| 2802 | b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E", |
| 2803 | "http+unix://%2Fvar%2Frun%2Fsocket/path~", |
| 2804 | ), |
| 2805 | ( |
| 2806 | "http+unix://%2Fvar%2Frun%2Fsocket/path%7E", |
| 2807 | "http+unix://%2Fvar%2Frun%2Fsocket/path~", |
| 2808 | ), |
| 2809 | ( |
| 2810 | b"mailto:user@example.org", |
| 2811 | "mailto:user@example.org", |
| 2812 | ), |
| 2813 | ( |
| 2814 | "mailto:user@example.org", |
| 2815 | "mailto:user@example.org", |
| 2816 | ), |
| 2817 | ( |
| 2818 | b"data:SSDimaUgUHl0aG9uIQ==", |
| 2819 | "data:SSDimaUgUHl0aG9uIQ==", |
| 2820 | ), |
| 2821 | ), |
| 2822 | ) |
| 2823 | def test_url_mutation(self, input, expected): |
| 2824 | """ |
| 2825 | This test validates that we correctly exclude some URLs from |
| 2826 | preparation, and that we handle others. Specifically, it tests that |
| 2827 | any URL whose scheme doesn't begin with "http" is left alone, and |
| 2828 | those whose scheme *does* begin with "http" are mutated. |
| 2829 | """ |
| 2830 | r = requests.Request("GET", url=input) |
| 2831 | p = r.prepare() |
| 2832 | assert p.url == expected |
| 2833 | |
| 2834 | @pytest.mark.parametrize( |
| 2835 | "input, params, expected", |
| 2836 | ( |
| 2837 | ( |
| 2838 | b"http+unix://%2Fvar%2Frun%2Fsocket/path", |
| 2839 | {"key": "value"}, |
| 2840 | "http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", |
| 2841 | ), |
| 2842 | ( |
| 2843 | "http+unix://%2Fvar%2Frun%2Fsocket/path", |
| 2844 | {"key": "value"}, |
| 2845 | "http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", |
| 2846 | ), |
| 2847 | ( |
| 2848 | b"mailto:user@example.org", |
| 2849 | {"key": "value"}, |
| 2850 | "mailto:user@example.org", |
| 2851 | ), |
| 2852 | ( |
| 2853 | "mailto:user@example.org", |
| 2854 | {"key": "value"}, |
| 2855 | "mailto:user@example.org", |
| 2856 | ), |
| 2857 | ), |
| 2858 | ) |
| 2859 | def test_parameters_for_nonstandard_schemes(self, input, params, expected): |
| 2860 | """ |
| 2861 | Setting parameters for nonstandard schemes is allowed if those schemes |
| 2862 | begin with "http", and is forbidden otherwise. |
| 2863 | """ |
| 2864 | r = requests.Request("GET", url=input, params=params) |
| 2865 | p = r.prepare() |
| 2866 | assert p.url == expected |
| 2867 | |
| 2868 | def test_post_json_nan(self, httpbin): |
| 2869 | data = {"foo": float("nan")} |
| 2870 | with pytest.raises(requests.exceptions.InvalidJSONError): |
| 2871 | requests.post(httpbin("post"), json=data) |
| 2872 | |
| 2873 | def test_json_decode_compatibility(self, httpbin): |
| 2874 | r = requests.get(httpbin("bytes/20")) |
| 2875 | with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo: |
| 2876 | r.json() |
| 2877 | assert isinstance(excinfo.value, RequestException) |
| 2878 | assert isinstance(excinfo.value, JSONDecodeError) |
| 2879 | assert r.text not in str(excinfo.value) |
| 2880 | |
| 2881 | def test_json_decode_persists_doc_attr(self, httpbin): |
| 2882 | r = requests.get(httpbin("bytes/20")) |
| 2883 | with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo: |
| 2884 | r.json() |
| 2885 | assert excinfo.value.doc == r.text |
| 2886 | |
| 2887 | def test_status_code_425(self): |
| 2888 | r1 = requests.codes.get("TOO_EARLY") |
| 2889 | r2 = requests.codes.get("too_early") |
| 2890 | r3 = requests.codes.get("UNORDERED") |
| 2891 | r4 = requests.codes.get("unordered") |
| 2892 | r5 = requests.codes.get("UNORDERED_COLLECTION") |
| 2893 | r6 = requests.codes.get("unordered_collection") |
| 2894 | |
| 2895 | assert r1 == 425 |
| 2896 | assert r2 == 425 |
| 2897 | assert r3 == 425 |
| 2898 | assert r4 == 425 |
| 2899 | assert r5 == 425 |
| 2900 | assert r6 == 425 |
| 2901 | |
| 2902 | def test_different_connection_pool_for_tls_settings_verify_True(self): |
| 2903 | def response_handler(sock): |
| 2904 | consume_socket_content(sock, timeout=0.5) |
| 2905 | sock.send( |
| 2906 | b"HTTP/1.1 200 OK\r\n" |
| 2907 | b"Content-Length: 18\r\n\r\n" |
| 2908 | b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' |
| 2909 | ) |
| 2910 | |
| 2911 | s = requests.Session() |
| 2912 | close_server = threading.Event() |
| 2913 | server = TLSServer( |
| 2914 | handler=response_handler, |
| 2915 | wait_to_close_event=close_server, |
| 2916 | requests_to_handle=3, |
| 2917 | cert_chain="tests/certs/expired/server/server.pem", |
| 2918 | keyfile="tests/certs/expired/server/server.key", |
| 2919 | ) |
| 2920 | |
| 2921 | with server as (host, port): |
| 2922 | url = f"https://{host}:{port}" |
| 2923 | r1 = s.get(url, verify=False) |
| 2924 | assert r1.status_code == 200 |
| 2925 | |
| 2926 | # Cannot verify self-signed certificate |
| 2927 | with pytest.raises(requests.exceptions.SSLError): |
| 2928 | s.get(url) |
| 2929 | |
| 2930 | close_server.set() |
| 2931 | assert 2 == len(s.adapters["https://"].poolmanager.pools) |
| 2932 | |
| 2933 | def test_different_connection_pool_for_tls_settings_verify_bundle_expired_cert( |
| 2934 | self, |
| 2935 | ): |
| 2936 | def response_handler(sock): |
| 2937 | consume_socket_content(sock, timeout=0.5) |
| 2938 | sock.send( |
| 2939 | b"HTTP/1.1 200 OK\r\n" |
| 2940 | b"Content-Length: 18\r\n\r\n" |
| 2941 | b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' |
| 2942 | ) |
| 2943 | |
| 2944 | s = requests.Session() |
| 2945 | close_server = threading.Event() |
| 2946 | server = TLSServer( |
| 2947 | handler=response_handler, |
| 2948 | wait_to_close_event=close_server, |
| 2949 | requests_to_handle=3, |
| 2950 | cert_chain="tests/certs/expired/server/server.pem", |
| 2951 | keyfile="tests/certs/expired/server/server.key", |
| 2952 | ) |
| 2953 | |
| 2954 | with server as (host, port): |
| 2955 | url = f"https://{host}:{port}" |
| 2956 | r1 = s.get(url, verify=False) |
| 2957 | assert r1.status_code == 200 |
| 2958 | |
| 2959 | # Has right trust bundle, but certificate expired |
| 2960 | with pytest.raises(requests.exceptions.SSLError): |
| 2961 | s.get(url, verify="tests/certs/expired/ca/ca.crt") |
| 2962 | |
| 2963 | close_server.set() |
| 2964 | assert 2 == len(s.adapters["https://"].poolmanager.pools) |
| 2965 | |
| 2966 | def test_different_connection_pool_for_tls_settings_verify_bundle_unexpired_cert( |
| 2967 | self, |
| 2968 | ): |
| 2969 | def response_handler(sock): |
| 2970 | consume_socket_content(sock, timeout=0.5) |
| 2971 | sock.send( |
| 2972 | b"HTTP/1.1 200 OK\r\n" |
| 2973 | b"Content-Length: 18\r\n\r\n" |
| 2974 | b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' |
| 2975 | ) |
| 2976 | |
| 2977 | s = requests.Session() |
| 2978 | close_server = threading.Event() |
| 2979 | server = TLSServer( |
| 2980 | handler=response_handler, |
| 2981 | wait_to_close_event=close_server, |
| 2982 | requests_to_handle=3, |
| 2983 | cert_chain="tests/certs/valid/server/server.pem", |
| 2984 | keyfile="tests/certs/valid/server/server.key", |
| 2985 | ) |
| 2986 | |
| 2987 | with server as (host, port): |
| 2988 | url = f"https://{host}:{port}" |
| 2989 | r1 = s.get(url, verify=False) |
| 2990 | assert r1.status_code == 200 |
| 2991 | |
| 2992 | r2 = s.get(url, verify="tests/certs/valid/ca/ca.crt") |
| 2993 | assert r2.status_code == 200 |
| 2994 | |
| 2995 | close_server.set() |
| 2996 | assert 2 == len(s.adapters["https://"].poolmanager.pools) |
| 2997 | |
| 2998 | def test_different_connection_pool_for_mtls_settings(self): |
| 2999 | client_cert = None |
| 3000 | |
| 3001 | def response_handler(sock): |
| 3002 | nonlocal client_cert |
| 3003 | client_cert = sock.getpeercert() |
| 3004 | consume_socket_content(sock, timeout=0.5) |
| 3005 | sock.send( |
| 3006 | b"HTTP/1.1 200 OK\r\n" |
| 3007 | b"Content-Length: 18\r\n\r\n" |
| 3008 | b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' |
| 3009 | ) |
| 3010 | |
| 3011 | s = requests.Session() |
| 3012 | close_server = threading.Event() |
| 3013 | server = TLSServer( |
| 3014 | handler=response_handler, |
| 3015 | wait_to_close_event=close_server, |
| 3016 | requests_to_handle=2, |
| 3017 | cert_chain="tests/certs/expired/server/server.pem", |
| 3018 | keyfile="tests/certs/expired/server/server.key", |
| 3019 | mutual_tls=True, |
| 3020 | cacert="tests/certs/expired/ca/ca.crt", |
| 3021 | ) |
| 3022 | |
| 3023 | cert = ( |
| 3024 | "tests/certs/mtls/client/client.pem", |
| 3025 | "tests/certs/mtls/client/client.key", |
| 3026 | ) |
| 3027 | with server as (host, port): |
| 3028 | url = f"https://{host}:{port}" |
| 3029 | r1 = s.get(url, verify=False, cert=cert) |
| 3030 | assert r1.status_code == 200 |
| 3031 | with pytest.raises(requests.exceptions.SSLError): |
| 3032 | s.get(url, cert=cert) |
| 3033 | close_server.set() |
| 3034 | |
| 3035 | assert client_cert is not None |
| 3036 | |
| 3037 | |
| 3038 | def test_content_length_for_bytes_data(httpbin): |
| 3039 | data = "This is a string containing multi-byte UTF-8 ☃️" |
| 3040 | encoded_data = data.encode("utf-8") |
| 3041 | length = str(len(encoded_data)) |
| 3042 | req = requests.Request("POST", httpbin("post"), data=encoded_data) |
| 3043 | p = req.prepare() |
| 3044 | |
| 3045 | assert p.headers["Content-Length"] == length |
| 3046 | |
| 3047 | |
| 3048 | @pytest.mark.skipif( |
| 3049 | is_urllib3_1, |
| 3050 | reason="urllib3 2.x encodes all strings to utf-8, urllib3 1.x uses latin-1", |
| 3051 | ) |
| 3052 | def test_content_length_for_string_data_counts_bytes(httpbin): |
| 3053 | data = "This is a string containing multi-byte UTF-8 ☃️" |
| 3054 | length = str(len(data.encode("utf-8"))) |
| 3055 | req = requests.Request("POST", httpbin("post"), data=data) |
| 3056 | p = req.prepare() |
| 3057 | |
| 3058 | assert p.headers["Content-Length"] == length |
| 3059 | |
| 3060 | |
| 3061 | def test_json_decode_errors_are_serializable_deserializable(): |
| 3062 | json_decode_error = requests.exceptions.JSONDecodeError( |
| 3063 | "Extra data", |
| 3064 | '{"responseCode":["706"],"data":null}{"responseCode":["706"],"data":null}', |
| 3065 | 36, |
| 3066 | ) |
| 3067 | deserialized_error = pickle.loads(pickle.dumps(json_decode_error)) |
| 3068 | assert repr(json_decode_error) == repr(deserialized_error) |
| 3069 |