← requests  /  tests/test_requests.py

1
"""Tests for Requests."""
2
3
import collections
4
import contextlib
5
import io
6
import json
7
import os
8
import pickle
9
import re
10
import tempfile
11
import threading
12
import warnings
13
from unittest import mock
14
15
import pytest
16
import urllib3
17
from urllib3.util import Timeout as Urllib3Timeout
18
19
import requests
20
from requests.adapters import HTTPAdapter
21
from requests.auth import HTTPDigestAuth, _basic_auth_str
22
from requests.compat import (
23
    JSONDecodeError,
24
    Morsel,
25
    MutableMapping,
26
    builtin_str,
27
    cookielib,
28
    getproxies,
29
    is_urllib3_1,
30
    urlparse,
31
)
32
from requests.cookies import cookiejar_from_dict, morsel_to_cookie
33
from requests.exceptions import (
34
    ChunkedEncodingError,
35
    ConnectionError,
36
    ConnectTimeout,
37
    ContentDecodingError,
38
    InvalidHeader,
39
    InvalidProxyURL,
40
    InvalidSchema,
41
    InvalidURL,
42
    MissingSchema,
43
    ProxyError,
44
    ReadTimeout,
45
    RequestException,
46
    RetryError,
47
    Timeout,
48
    TooManyRedirects,
49
    UnrewindableBodyError,
50
)
51
from requests.exceptions import SSLError as RequestsSSLError
52
from requests.hooks import default_hooks
53
from requests.models import PreparedRequest, urlencode
54
from requests.sessions import SessionRedirectMixin
55
from requests.structures import CaseInsensitiveDict
56
57
from . import SNIMissingWarning
58
from .compat import StringIO
59
from .testserver.server import TLSServer, consume_socket_content
60
from .utils import override_environ
61
62
# Requests to this URL should always fail with a connection timeout (nothing
63
# listening on that port)
64
TARPIT = "http://10.255.255.1"
65
66
# This is to avoid waiting the timeout of using TARPIT
67
INVALID_PROXY = "http://localhost:1"
68
69
try:
70
    from ssl import SSLContext
71
72
    del SSLContext
73
    HAS_MODERN_SSL = True
74
except ImportError:
75
    HAS_MODERN_SSL = False
76
77
try:
78
    requests.pyopenssl
79
    HAS_PYOPENSSL = True
80
except AttributeError:
81
    HAS_PYOPENSSL = False
82
83
84
class TestRequests:
85
    digest_auth_algo = ("MD5", "SHA-256", "SHA-512")
86
87
    def test_entry_points(self):
88
        requests.session
89
        requests.session().get
90
        requests.session().head
91
        requests.get
92
        requests.head
93
        requests.put
94
        requests.patch
95
        requests.post
96
        # Not really an entry point, but people rely on it.
97
        from requests.packages.urllib3.poolmanager import PoolManager  # noqa:F401
98
99
    @pytest.mark.parametrize(
100
        "exception, url",
101
        (
102
            (MissingSchema, "hiwpefhipowhefopw"),
103
            (InvalidSchema, "localhost:3128"),
104
            (InvalidSchema, "localhost.localdomain:3128/"),
105
            (InvalidSchema, "10.122.1.1:3128/"),
106
            (InvalidURL, "http://"),
107
            (InvalidURL, "http://*example.com"),
108
            (InvalidURL, "http://.example.com"),
109
        ),
110
    )
111
    def test_invalid_url(self, exception, url):
112
        with pytest.raises(exception):
113
            requests.get(url)
114
115
    def test_basic_building(self):
116
        req = requests.Request()
117
        req.url = "http://kennethreitz.org/"
118
        req.data = {"life": "42"}
119
120
        pr = req.prepare()
121
        assert pr.url == req.url
122
        assert pr.body == "life=42"
123
124
    @pytest.mark.parametrize("method", ("GET", "HEAD"))
125
    def test_no_content_length(self, httpbin, method):
126
        req = requests.Request(method, httpbin(method.lower())).prepare()
127
        assert "Content-Length" not in req.headers
128
129
    @pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS"))
130
    def test_no_body_content_length(self, httpbin, method):
131
        req = requests.Request(method, httpbin(method.lower())).prepare()
132
        assert req.headers["Content-Length"] == "0"
133
134
    @pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS"))
135
    def test_empty_content_length(self, httpbin, method):
136
        req = requests.Request(method, httpbin(method.lower()), data="").prepare()
137
        assert req.headers["Content-Length"] == "0"
138
139
    def test_override_content_length(self, httpbin):
140
        headers = {"Content-Length": "not zero"}
141
        r = requests.Request("POST", httpbin("post"), headers=headers).prepare()
142
        assert "Content-Length" in r.headers
143
        assert r.headers["Content-Length"] == "not zero"
144
145
    def test_path_is_not_double_encoded(self):
146
        request = requests.Request("GET", "http://0.0.0.0/get/test case").prepare()
147
148
        assert request.path_url == "/get/test%20case"
149
150
    @pytest.mark.parametrize(
151
        "url, expected",
152
        (
153
            (
154
                "http://example.com/path#fragment",
155
                "http://example.com/path?a=b#fragment",
156
            ),
157
            (
158
                "http://example.com/path?key=value#fragment",
159
                "http://example.com/path?key=value&a=b#fragment",
160
            ),
161
        ),
162
    )
163
    def test_params_are_added_before_fragment(self, url, expected):
164
        request = requests.Request("GET", url, params={"a": "b"}).prepare()
165
        assert request.url == expected
166
167
    def test_params_original_order_is_preserved_by_default(self):
168
        param_ordered_dict = collections.OrderedDict(
169
            (("z", 1), ("a", 1), ("k", 1), ("d", 1))
170
        )
171
        session = requests.Session()
172
        request = requests.Request(
173
            "GET", "http://example.com/", params=param_ordered_dict
174
        )
175
        prep = session.prepare_request(request)
176
        assert prep.url == "http://example.com/?z=1&a=1&k=1&d=1"
177
178
    def test_params_bytes_are_encoded(self):
179
        request = requests.Request(
180
            "GET", "http://example.com", params=b"test=foo"
181
        ).prepare()
182
        assert request.url == "http://example.com/?test=foo"
183
184
    def test_binary_put(self):
185
        request = requests.Request(
186
            "PUT", "http://example.com", data="ööö".encode()
187
        ).prepare()
188
        assert isinstance(request.body, bytes)
189
190
    def test_whitespaces_are_removed_from_url(self):
191
        # Test for issue #3696
192
        request = requests.Request("GET", " http://example.com").prepare()
193
        assert request.url == "http://example.com/"
194
195
    @pytest.mark.parametrize("scheme", ("http://", "HTTP://", "hTTp://", "HttP://"))
196
    def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
197
        s = requests.Session()
198
        s.proxies = getproxies()
199
        parts = urlparse(httpbin("get"))
200
        url = scheme + parts.netloc + parts.path
201
        r = requests.Request("GET", url)
202
        r = s.send(r.prepare())
203
        assert r.status_code == 200, f"failed for scheme {scheme}"
204
205
    def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
206
        r = requests.Request("GET", httpbin("get"))
207
        s = requests.Session()
208
        s.proxies = getproxies()
209
210
        r = s.send(r.prepare())
211
212
        assert r.status_code == 200
213
214
    def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
215
        r = requests.get(httpbin("redirect", "1"))
216
        assert r.status_code == 200
217
        assert r.history[0].status_code == 302
218
        assert r.history[0].is_redirect
219
220
    def test_redirect_history_no_self_reference(self, httpbin):
221
        r = requests.get(httpbin("redirect", "3"))
222
        assert r.status_code == 200
223
        assert len(r.history) == 3
224
        for i, resp in enumerate(r.history):
225
            assert resp not in resp.history
226
            assert resp.history == r.history[:i]
227
228
    def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin):
229
        r = requests.post(
230
            httpbin("redirect-to"),
231
            data="test",
232
            params={"url": "post", "status_code": 307},
233
        )
234
        assert r.status_code == 200
235
        assert r.history[0].status_code == 307
236
        assert r.history[0].is_redirect
237
        assert r.json()["data"] == "test"
238
239
    def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin):
240
        byte_str = b"test"
241
        r = requests.post(
242
            httpbin("redirect-to"),
243
            data=io.BytesIO(byte_str),
244
            params={"url": "post", "status_code": 307},
245
        )
246
        assert r.status_code == 200
247
        assert r.history[0].status_code == 307
248
        assert r.history[0].is_redirect
249
        assert r.json()["data"] == byte_str.decode("utf-8")
250
251
    def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin):
252
        try:
253
            requests.get(httpbin("relative-redirect", "50"))
254
        except TooManyRedirects as e:
255
            url = httpbin("relative-redirect", "20")
256
            assert e.request.url == url
257
            assert e.response.url == url
258
            assert len(e.response.history) == 30
259
        else:
260
            pytest.fail("Expected redirect to raise TooManyRedirects but it did not")
261
262
    def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
263
        s = requests.session()
264
        s.max_redirects = 5
265
        try:
266
            s.get(httpbin("relative-redirect", "50"))
267
        except TooManyRedirects as e:
268
            url = httpbin("relative-redirect", "45")
269
            assert e.request.url == url
270
            assert e.response.url == url
271
            assert len(e.response.history) == 5
272
        else:
273
            pytest.fail(
274
                "Expected custom max number of redirects to be respected but was not"
275
            )
276
277
    def test_http_301_changes_post_to_get(self, httpbin):
278
        r = requests.post(httpbin("status", "301"))
279
        assert r.status_code == 200
280
        assert r.request.method == "GET"
281
        assert r.history[0].status_code == 301
282
        assert r.history[0].is_redirect
283
284
    def test_http_301_doesnt_change_head_to_get(self, httpbin):
285
        r = requests.head(httpbin("status", "301"), allow_redirects=True)
286
        print(r.content)
287
        assert r.status_code == 200
288
        assert r.request.method == "HEAD"
289
        assert r.history[0].status_code == 301
290
        assert r.history[0].is_redirect
291
292
    def test_http_302_changes_post_to_get(self, httpbin):
293
        r = requests.post(httpbin("status", "302"))
294
        assert r.status_code == 200
295
        assert r.request.method == "GET"
296
        assert r.history[0].status_code == 302
297
        assert r.history[0].is_redirect
298
299
    def test_http_302_doesnt_change_head_to_get(self, httpbin):
300
        r = requests.head(httpbin("status", "302"), allow_redirects=True)
301
        assert r.status_code == 200
302
        assert r.request.method == "HEAD"
303
        assert r.history[0].status_code == 302
304
        assert r.history[0].is_redirect
305
306
    def test_http_303_changes_post_to_get(self, httpbin):
307
        r = requests.post(httpbin("status", "303"))
308
        assert r.status_code == 200
309
        assert r.request.method == "GET"
310
        assert r.history[0].status_code == 303
311
        assert r.history[0].is_redirect
312
313
    def test_http_303_doesnt_change_head_to_get(self, httpbin):
314
        r = requests.head(httpbin("status", "303"), allow_redirects=True)
315
        assert r.status_code == 200
316
        assert r.request.method == "HEAD"
317
        assert r.history[0].status_code == 303
318
        assert r.history[0].is_redirect
319
320
    def test_header_and_body_removal_on_redirect(self, httpbin):
321
        purged_headers = ("Content-Length", "Content-Type")
322
        ses = requests.Session()
323
        req = requests.Request("POST", httpbin("post"), data={"test": "data"})
324
        prep = ses.prepare_request(req)
325
        resp = ses.send(prep)
326
327
        # Mimic a redirect response
328
        resp.status_code = 302
329
        resp.headers["location"] = "get"
330
331
        # Run request through resolve_redirects
332
        next_resp = next(ses.resolve_redirects(resp, prep))
333
        assert next_resp.request.body is None
334
        for header in purged_headers:
335
            assert header not in next_resp.request.headers
336
337
    def test_transfer_enc_removal_on_redirect(self, httpbin):
338
        purged_headers = ("Transfer-Encoding", "Content-Type")
339
        ses = requests.Session()
340
        req = requests.Request("POST", httpbin("post"), data=(b"x" for x in range(1)))
341
        prep = ses.prepare_request(req)
342
        assert "Transfer-Encoding" in prep.headers
343
344
        # Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33
345
        resp = requests.Response()
346
        resp.raw = io.BytesIO(b"the content")
347
        resp.request = prep
348
        setattr(resp.raw, "release_conn", lambda *args: args)
349
350
        # Mimic a redirect response
351
        resp.status_code = 302
352
        resp.headers["location"] = httpbin("get")
353
354
        # Run request through resolve_redirect
355
        next_resp = next(ses.resolve_redirects(resp, prep))
356
        assert next_resp.request.body is None
357
        for header in purged_headers:
358
            assert header not in next_resp.request.headers
359
360
    def test_fragment_maintained_on_redirect(self, httpbin):
361
        fragment = "#view=edit&token=hunter2"
362
        r = requests.get(httpbin("redirect-to?url=get") + fragment)
363
364
        assert len(r.history) > 0
365
        assert r.history[0].request.url == httpbin("redirect-to?url=get") + fragment
366
        assert r.url == httpbin("get") + fragment
367
368
    def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
369
        heads = {"User-agent": "Mozilla/5.0"}
370
371
        r = requests.get(httpbin("user-agent"), headers=heads)
372
373
        assert heads["User-agent"] in r.text
374
        assert r.status_code == 200
375
376
    def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
377
        heads = {"User-agent": "Mozilla/5.0"}
378
379
        r = requests.get(
380
            httpbin("get") + "?test=true", params={"q": "test"}, headers=heads
381
        )
382
        assert r.status_code == 200
383
384
    def test_set_cookie_on_301(self, httpbin):
385
        s = requests.session()
386
        url = httpbin("cookies/set?foo=bar")
387
        s.get(url)
388
        assert s.cookies["foo"] == "bar"
389
390
    def test_cookie_sent_on_redirect(self, httpbin):
391
        s = requests.session()
392
        s.get(httpbin("cookies/set?foo=bar"))
393
        r = s.get(httpbin("redirect/1"))  # redirects to httpbin('get')
394
        assert "Cookie" in r.json()["headers"]
395
396
    def test_cookie_removed_on_expire(self, httpbin):
397
        s = requests.session()
398
        s.get(httpbin("cookies/set?foo=bar"))
399
        assert s.cookies["foo"] == "bar"
400
        s.get(
401
            httpbin("response-headers"),
402
            params={"Set-Cookie": "foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT"},
403
        )
404
        assert "foo" not in s.cookies
405
406
    def test_cookie_quote_wrapped(self, httpbin):
407
        s = requests.session()
408
        s.get(httpbin('cookies/set?foo="bar:baz"'))
409
        assert s.cookies["foo"] == '"bar:baz"'
410
411
    def test_cookie_persists_via_api(self, httpbin):
412
        s = requests.session()
413
        r = s.get(httpbin("redirect/1"), cookies={"foo": "bar"})
414
        assert "foo" in r.request.headers["Cookie"]
415
        assert "foo" in r.history[0].request.headers["Cookie"]
416
417
    def test_request_cookie_overrides_session_cookie(self, httpbin):
418
        s = requests.session()
419
        s.cookies["foo"] = "bar"
420
        r = s.get(httpbin("cookies"), cookies={"foo": "baz"})
421
        assert r.json()["cookies"]["foo"] == "baz"
422
        # Session cookie should not be modified
423
        assert s.cookies["foo"] == "bar"
424
425
    def test_request_cookies_not_persisted(self, httpbin):
426
        s = requests.session()
427
        s.get(httpbin("cookies"), cookies={"foo": "baz"})
428
        # Sending a request with cookies should not add cookies to the session
429
        assert not s.cookies
430
431
    def test_generic_cookiejar_works(self, httpbin):
432
        cj = cookielib.CookieJar()
433
        cookiejar_from_dict({"foo": "bar"}, cj)
434
        s = requests.session()
435
        s.cookies = cj
436
        r = s.get(httpbin("cookies"))
437
        # Make sure the cookie was sent
438
        assert r.json()["cookies"]["foo"] == "bar"
439
        # Make sure the session cj is still the custom one
440
        assert s.cookies is cj
441
442
    def test_param_cookiejar_works(self, httpbin):
443
        cj = cookielib.CookieJar()
444
        cookiejar_from_dict({"foo": "bar"}, cj)
445
        s = requests.session()
446
        r = s.get(httpbin("cookies"), cookies=cj)
447
        # Make sure the cookie was sent
448
        assert r.json()["cookies"]["foo"] == "bar"
449
450
    def test_cookielib_cookiejar_on_redirect(self, httpbin):
451
        """Tests resolve_redirect doesn't fail when merging cookies
452
        with non-RequestsCookieJar cookiejar.
453
454
        See GH #3579
455
        """
456
        cj = cookiejar_from_dict({"foo": "bar"}, cookielib.CookieJar())
457
        s = requests.Session()
458
        s.cookies = cookiejar_from_dict({"cookie": "tasty"})
459
460
        # Prepare request without using Session
461
        req = requests.Request("GET", httpbin("headers"), cookies=cj)
462
        prep_req = req.prepare()
463
464
        # Send request and simulate redirect
465
        resp = s.send(prep_req)
466
        resp.status_code = 302
467
        resp.headers["location"] = httpbin("get")
468
        redirects = s.resolve_redirects(resp, prep_req)
469
        resp = next(redirects)
470
471
        # Verify CookieJar isn't being converted to RequestsCookieJar
472
        assert isinstance(prep_req._cookies, cookielib.CookieJar)
473
        assert isinstance(resp.request._cookies, cookielib.CookieJar)
474
        assert not isinstance(resp.request._cookies, requests.cookies.RequestsCookieJar)
475
476
        cookies = {}
477
        for c in resp.request._cookies:
478
            cookies[c.name] = c.value
479
        assert cookies["foo"] == "bar"
480
        assert cookies["cookie"] == "tasty"
481
482
    def test_requests_in_history_are_not_overridden(self, httpbin):
483
        resp = requests.get(httpbin("redirect/3"))
484
        urls = [r.url for r in resp.history]
485
        req_urls = [r.request.url for r in resp.history]
486
        assert urls == req_urls
487
488
    def test_history_is_always_a_list(self, httpbin):
489
        """Show that even with redirects, Response.history is always a list."""
490
        resp = requests.get(httpbin("get"))
491
        assert isinstance(resp.history, list)
492
        resp = requests.get(httpbin("redirect/1"))
493
        assert isinstance(resp.history, list)
494
        assert not isinstance(resp.history, tuple)
495
496
    def test_headers_on_session_with_None_are_not_sent(self, httpbin):
497
        """Do not send headers in Session.headers with None values."""
498
        ses = requests.Session()
499
        ses.headers["Accept-Encoding"] = None
500
        req = requests.Request("GET", httpbin("get"))
501
        prep = ses.prepare_request(req)
502
        assert "Accept-Encoding" not in prep.headers
503
504
    def test_headers_preserve_order(self, httpbin):
505
        """Preserve order when headers provided as OrderedDict."""
506
        ses = requests.Session()
507
        ses.headers = collections.OrderedDict()
508
        ses.headers["Accept-Encoding"] = "identity"
509
        ses.headers["First"] = "1"
510
        ses.headers["Second"] = "2"
511
        headers = collections.OrderedDict([("Third", "3"), ("Fourth", "4")])
512
        headers["Fifth"] = "5"
513
        headers["Second"] = "222"
514
        req = requests.Request("GET", httpbin("get"), headers=headers)
515
        prep = ses.prepare_request(req)
516
        items = list(prep.headers.items())
517
        assert items[0] == ("Accept-Encoding", "identity")
518
        assert items[1] == ("First", "1")
519
        assert items[2] == ("Second", "222")
520
        assert items[3] == ("Third", "3")
521
        assert items[4] == ("Fourth", "4")
522
        assert items[5] == ("Fifth", "5")
523
524
    @pytest.mark.parametrize("key", ("User-agent", "user-agent"))
525
    def test_user_agent_transfers(self, httpbin, key):
526
        heads = {key: "Mozilla/5.0 (github.com/psf/requests)"}
527
528
        r = requests.get(httpbin("user-agent"), headers=heads)
529
        assert heads[key] in r.text
530
531
    def test_HTTP_200_OK_HEAD(self, httpbin):
532
        r = requests.head(httpbin("get"))
533
        assert r.status_code == 200
534
535
    def test_HTTP_200_OK_PUT(self, httpbin):
536
        r = requests.put(httpbin("put"))
537
        assert r.status_code == 200
538
539
    def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
540
        auth = ("user", "pass")
541
        url = httpbin("basic-auth", "user", "pass")
542
543
        r = requests.get(url, auth=auth)
544
        assert r.status_code == 200
545
546
        r = requests.get(url)
547
        assert r.status_code == 401
548
549
        s = requests.session()
550
        s.auth = auth
551
        r = s.get(url)
552
        assert r.status_code == 200
553
554
    @pytest.mark.parametrize(
555
        "username, password",
556
        (
557
            ("user", "pass"),
558
            ("имя".encode(), "пароль".encode()),
559
            (42, 42),
560
            (None, None),
561
        ),
562
    )
563
    def test_set_basicauth(self, httpbin, username, password):
564
        auth = (username, password)
565
        url = httpbin("get")
566
567
        r = requests.Request("GET", url, auth=auth)
568
        p = r.prepare()
569
570
        assert p.headers["Authorization"] == _basic_auth_str(username, password)
571
572
    def test_basicauth_encodes_byte_strings(self):
573
        """Ensure b'test' formats as the byte string "test" rather
574
        than the unicode string "b'test'" in Python 3.
575
        """
576
        auth = (b"\xc5\xafsername", b"test\xc6\xb6")
577
        r = requests.Request("GET", "http://localhost", auth=auth)
578
        p = r.prepare()
579
580
        assert p.headers["Authorization"] == "Basic xa9zZXJuYW1lOnRlc3TGtg=="
581
582
    @pytest.mark.parametrize(
583
        "url, exception",
584
        (
585
            # Connecting to an unknown domain should raise a ConnectionError
586
            ("http://doesnotexist.google.com", ConnectionError),
587
            # Connecting to an invalid port should raise a ConnectionError
588
            ("http://localhost:1", ConnectionError),
589
            # Inputting a URL that cannot be parsed should raise an InvalidURL error
590
            ("http://fe80::5054:ff:fe5a:fc0", InvalidURL),
591
        ),
592
    )
593
    def test_errors(self, url, exception):
594
        with pytest.raises(exception):
595
            requests.get(url, timeout=1)
596
597
    def test_proxy_error(self):
598
        # any proxy related error (address resolution, no route to host, etc) should result in a ProxyError
599
        with pytest.raises(ProxyError):
600
            requests.get(
601
                "http://localhost:1", proxies={"http": "non-resolvable-address"}
602
            )
603
604
    def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure):
605
        with pytest.raises(InvalidProxyURL):
606
            requests.get(httpbin_secure(), proxies={"https": "http:/badproxyurl:3128"})
607
608
        with pytest.raises(InvalidProxyURL):
609
            requests.get(httpbin(), proxies={"http": "http://:8080"})
610
611
        with pytest.raises(InvalidProxyURL):
612
            requests.get(httpbin_secure(), proxies={"https": "https://"})
613
614
        with pytest.raises(InvalidProxyURL):
615
            requests.get(httpbin(), proxies={"http": "http:///example.com:8080"})
616
617
    def test_respect_proxy_env_on_send_self_prepared_request(self, httpbin):
618
        with override_environ(http_proxy=INVALID_PROXY):
619
            with pytest.raises(ProxyError):
620
                session = requests.Session()
621
                request = requests.Request("GET", httpbin())
622
                session.send(request.prepare())
623
624
    def test_respect_proxy_env_on_send_session_prepared_request(self, httpbin):
625
        with override_environ(http_proxy=INVALID_PROXY):
626
            with pytest.raises(ProxyError):
627
                session = requests.Session()
628
                request = requests.Request("GET", httpbin())
629
                prepared = session.prepare_request(request)
630
                session.send(prepared)
631
632
    def test_respect_proxy_env_on_send_with_redirects(self, httpbin):
633
        with override_environ(http_proxy=INVALID_PROXY):
634
            with pytest.raises(ProxyError):
635
                session = requests.Session()
636
                url = httpbin("redirect/1")
637
                print(url)
638
                request = requests.Request("GET", url)
639
                session.send(request.prepare())
640
641
    def test_respect_proxy_env_on_get(self, httpbin):
642
        with override_environ(http_proxy=INVALID_PROXY):
643
            with pytest.raises(ProxyError):
644
                session = requests.Session()
645
                session.get(httpbin())
646
647
    def test_respect_proxy_env_on_request(self, httpbin):
648
        with override_environ(http_proxy=INVALID_PROXY):
649
            with pytest.raises(ProxyError):
650
                session = requests.Session()
651
                session.request(method="GET", url=httpbin())
652
653
    def test_proxy_authorization_preserved_on_request(self, httpbin):
654
        proxy_auth_value = "Bearer XXX"
655
        session = requests.Session()
656
        session.headers.update({"Proxy-Authorization": proxy_auth_value})
657
        resp = session.request(method="GET", url=httpbin("get"))
658
        sent_headers = resp.json().get("headers", {})
659
660
        assert sent_headers.get("Proxy-Authorization") == proxy_auth_value
661
662
    @pytest.mark.parametrize(
663
        "url,has_proxy_auth",
664
        (
665
            ("http://example.com", True),
666
            ("https://example.com", False),
667
        ),
668
    )
669
    def test_proxy_authorization_not_appended_to_https_request(
670
        self, url, has_proxy_auth
671
    ):
672
        session = requests.Session()
673
        proxies = {
674
            "http": "http://test:pass@localhost:8080",
675
            "https": "http://test:pass@localhost:8090",
676
        }
677
        req = requests.Request("GET", url)
678
        prep = req.prepare()
679
        session.rebuild_proxies(prep, proxies)
680
681
        assert ("Proxy-Authorization" in prep.headers) is has_proxy_auth
682
683
    def test_basicauth_with_netrc(self, httpbin):
684
        auth = ("user", "pass")
685
        wrong_auth = ("wronguser", "wrongpass")
686
        url = httpbin("basic-auth", "user", "pass")
687
688
        old_auth = requests.sessions.get_netrc_auth
689
690
        try:
691
692
            def get_netrc_auth_mock(url):
693
                return auth
694
695
            requests.sessions.get_netrc_auth = get_netrc_auth_mock
696
697
            # Should use netrc and work.
698
            r = requests.get(url)
699
            assert r.status_code == 200
700
701
            # Given auth should override and fail.
702
            r = requests.get(url, auth=wrong_auth)
703
            assert r.status_code == 401
704
705
            s = requests.session()
706
707
            # Should use netrc and work.
708
            r = s.get(url)
709
            assert r.status_code == 200
710
711
            # Given auth should override and fail.
712
            s.auth = wrong_auth
713
            r = s.get(url)
714
            assert r.status_code == 401
715
        finally:
716
            requests.sessions.get_netrc_auth = old_auth
717
718
    def test_basicauth_with_netrc_leak(self, httpbin):
719
        url1 = httpbin("basic-auth", "user", "pass")
720
        url = url1[len("http://") :]
721
        domain = url.split(":")[0]
722
        url = f"http://example.com:@{url}"
723
724
        netrc_file = ""
725
        with tempfile.NamedTemporaryFile(mode="w", delete=False) as fp:
726
            fp.write("machine example.com\n")
727
            fp.write("login wronguser\n")
728
            fp.write("password wrongpass\n")
729
            fp.write(f"machine {domain}\n")
730
            fp.write("login user\n")
731
            fp.write("password pass\n")
732
            fp.close()
733
            netrc_file = fp.name
734
735
        old_netrc = os.environ.get("NETRC", "")
736
        os.environ["NETRC"] = netrc_file
737
738
        try:
739
            # Should use netrc
740
            # Make sure that we don't use the example.com credentials
741
            # for the request
742
            r = requests.get(url)
743
            assert r.status_code == 200
744
        finally:
745
            os.environ["NETRC"] = old_netrc
746
            os.unlink(netrc_file)
747
748
    def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
749
        for authtype in self.digest_auth_algo:
750
            auth = HTTPDigestAuth("user", "pass")
751
            url = httpbin("digest-auth", "auth", "user", "pass", authtype, "never")
752
753
            r = requests.get(url, auth=auth)
754
            assert r.status_code == 200
755
756
            r = requests.get(url)
757
            assert r.status_code == 401
758
            print(r.headers["WWW-Authenticate"])
759
760
            s = requests.session()
761
            s.auth = HTTPDigestAuth("user", "pass")
762
            r = s.get(url)
763
            assert r.status_code == 200
764
765
    def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
766
        for authtype in self.digest_auth_algo:
767
            url = httpbin("digest-auth", "auth", "user", "pass", authtype)
768
            auth = HTTPDigestAuth("user", "pass")
769
            r = requests.get(url)
770
            assert r.cookies["fake"] == "fake_value"
771
772
            r = requests.get(url, auth=auth)
773
            assert r.status_code == 200
774
775
    def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
776
        for authtype in self.digest_auth_algo:
777
            url = httpbin("digest-auth", "auth", "user", "pass", authtype)
778
            auth = HTTPDigestAuth("user", "pass")
779
            s = requests.Session()
780
            s.get(url, auth=auth)
781
            assert s.cookies["fake"] == "fake_value"
782
783
    def test_DIGEST_STREAM(self, httpbin):
784
        for authtype in self.digest_auth_algo:
785
            auth = HTTPDigestAuth("user", "pass")
786
            url = httpbin("digest-auth", "auth", "user", "pass", authtype)
787
788
            r = requests.get(url, auth=auth, stream=True)
789
            assert r.raw.read() != b""
790
791
            r = requests.get(url, auth=auth, stream=False)
792
            assert r.raw.read() == b""
793
794
    def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
795
        for authtype in self.digest_auth_algo:
796
            auth = HTTPDigestAuth("user", "wrongpass")
797
            url = httpbin("digest-auth", "auth", "user", "pass", authtype)
798
799
            r = requests.get(url, auth=auth)
800
            assert r.status_code == 401
801
802
            r = requests.get(url)
803
            assert r.status_code == 401
804
805
            s = requests.session()
806
            s.auth = auth
807
            r = s.get(url)
808
            assert r.status_code == 401
809
810
    def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
811
        for authtype in self.digest_auth_algo:
812
            auth = HTTPDigestAuth("user", "pass")
813
            url = httpbin("digest-auth", "auth", "user", "pass", authtype)
814
815
            r = requests.get(url, auth=auth)
816
            assert '"auth"' in r.request.headers["Authorization"]
817
818
    def test_POSTBIN_GET_POST_FILES(self, httpbin):
819
        url = httpbin("post")
820
        requests.post(url).raise_for_status()
821
822
        post1 = requests.post(url, data={"some": "data"})
823
        assert post1.status_code == 200
824
825
        with open("requirements-dev.txt") as f:
826
            post2 = requests.post(url, files={"some": f})
827
        assert post2.status_code == 200
828
829
        post4 = requests.post(url, data='[{"some": "json"}]')
830
        assert post4.status_code == 200
831
832
        with pytest.raises(ValueError):
833
            requests.post(url, files=["bad file data"])
834
835
    def test_invalid_files_input(self, httpbin):
836
        url = httpbin("post")
837
        post = requests.post(url, files={"random-file-1": None, "random-file-2": 1})
838
        assert b'name="random-file-1"' not in post.request.body
839
        assert b'name="random-file-2"' in post.request.body
840
841
    def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin):
842
        class TestStream:
843
            def __init__(self, data):
844
                self.data = data.encode()
845
                self.length = len(self.data)
846
                self.index = 0
847
848
            def __len__(self):
849
                return self.length
850
851
            def read(self, size=None):
852
                if size:
853
                    ret = self.data[self.index : self.index + size]
854
                    self.index += size
855
                else:
856
                    ret = self.data[self.index :]
857
                    self.index = self.length
858
                return ret
859
860
            def tell(self):
861
                return self.index
862
863
            def seek(self, offset, where=0):
864
                if where == 0:
865
                    self.index = offset
866
                elif where == 1:
867
                    self.index += offset
868
                elif where == 2:
869
                    self.index = self.length + offset
870
871
        test = TestStream("test")
872
        post1 = requests.post(httpbin("post"), data=test)
873
        assert post1.status_code == 200
874
        assert post1.json()["data"] == "test"
875
876
        test = TestStream("test")
877
        test.seek(2)
878
        post2 = requests.post(httpbin("post"), data=test)
879
        assert post2.status_code == 200
880
        assert post2.json()["data"] == "st"
881
882
    def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
883
        url = httpbin("post")
884
        requests.post(url).raise_for_status()
885
886
        post1 = requests.post(url, data={"some": "data"})
887
        assert post1.status_code == 200
888
889
        with open("requirements-dev.txt") as f:
890
            post2 = requests.post(url, data={"some": "data"}, files={"some": f})
891
        assert post2.status_code == 200
892
893
        post4 = requests.post(url, data='[{"some": "json"}]')
894
        assert post4.status_code == 200
895
896
        with pytest.raises(ValueError):
897
            requests.post(url, files=["bad file data"])
898
899
    def test_post_with_custom_mapping(self, httpbin):
900
        class CustomMapping(MutableMapping):
901
            def __init__(self, *args, **kwargs):
902
                self.data = dict(*args, **kwargs)
903
904
            def __delitem__(self, key):
905
                del self.data[key]
906
907
            def __getitem__(self, key):
908
                return self.data[key]
909
910
            def __setitem__(self, key, value):
911
                self.data[key] = value
912
913
            def __iter__(self):
914
                return iter(self.data)
915
916
            def __len__(self):
917
                return len(self.data)
918
919
        data = CustomMapping({"some": "data"})
920
        url = httpbin("post")
921
        found_json = requests.post(url, data=data).json().get("form")
922
        assert found_json == {"some": "data"}
923
924
    def test_conflicting_post_params(self, httpbin):
925
        url = httpbin("post")
926
        with open("requirements-dev.txt") as f:
927
            with pytest.raises(ValueError):
928
                requests.post(url, data='[{"some": "data"}]', files={"some": f})
929
930
    def test_request_ok_set(self, httpbin):
931
        r = requests.get(httpbin("status", "404"))
932
        assert not r.ok
933
934
    def test_status_raising(self, httpbin):
935
        r = requests.get(httpbin("status", "404"))
936
        with pytest.raises(requests.exceptions.HTTPError):
937
            r.raise_for_status()
938
939
        r = requests.get(httpbin("status", "500"))
940
        assert not r.ok
941
942
    def test_decompress_gzip(self, httpbin):
943
        r = requests.get(httpbin("gzip"))
944
        r.content.decode("ascii")
945
946
    @pytest.mark.parametrize(
947
        "url, params",
948
        (
949
            ("/get", {"foo": "føø"}),
950
            ("/get", {"føø": "føø"}),
951
            ("/get", {"føø": "føø"}),
952
            ("/get", {"foo": "foo"}),
953
            ("ø", {"foo": "foo"}),
954
        ),
955
    )
956
    def test_unicode_get(self, httpbin, url, params):
957
        requests.get(httpbin(url), params=params)
958
959
    def test_unicode_header_name(self, httpbin):
960
        requests.put(
961
            httpbin("put"),
962
            headers={"Content-Type": "application/octet-stream"},
963
            data="\xff",
964
        )  # compat.str is unicode.
965
966
    def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle):
967
        requests.get(httpbin_secure("status", "301"), verify=httpbin_ca_bundle)
968
969
    def test_invalid_ca_certificate_path(self, httpbin_secure):
970
        INVALID_PATH = "/garbage"
971
        with pytest.raises(IOError) as e:
972
            requests.get(httpbin_secure(), verify=INVALID_PATH)
973
        assert (
974
            str(e.value)
975
            == f"Could not find a suitable TLS CA certificate bundle, invalid path: {INVALID_PATH}"
976
        )
977
978
    def test_invalid_ssl_certificate_files(self, httpbin_secure):
979
        INVALID_PATH = "/garbage"
980
        with pytest.raises(IOError) as e:
981
            requests.get(httpbin_secure(), cert=INVALID_PATH)
982
        assert (
983
            str(e.value)
984
            == f"Could not find the TLS certificate file, invalid path: {INVALID_PATH}"
985
        )
986
987
        with pytest.raises(IOError) as e:
988
            requests.get(httpbin_secure(), cert=(".", INVALID_PATH))
989
        assert str(e.value) == (
990
            f"Could not find the TLS key file, invalid path: {INVALID_PATH}"
991
        )
992
993
    @pytest.mark.parametrize(
994
        "env, expected",
995
        (
996
            ({}, True),
997
            ({"REQUESTS_CA_BUNDLE": "/some/path"}, "/some/path"),
998
            ({"REQUESTS_CA_BUNDLE": ""}, True),
999
            ({"CURL_CA_BUNDLE": "/some/path"}, "/some/path"),
1000
            ({"CURL_CA_BUNDLE": ""}, True),
1001
            ({"REQUESTS_CA_BUNDLE": "", "CURL_CA_BUNDLE": ""}, True),
1002
            (
1003
                {
1004
                    "REQUESTS_CA_BUNDLE": "/some/path",
1005
                    "CURL_CA_BUNDLE": "/curl/path",
1006
                },
1007
                "/some/path",
1008
            ),
1009
            (
1010
                {
1011
                    "REQUESTS_CA_BUNDLE": "",
1012
                    "CURL_CA_BUNDLE": "/curl/path",
1013
                },
1014
                "/curl/path",
1015
            ),
1016
        ),
1017
    )
1018
    def test_env_cert_bundles(self, httpbin, env, expected):
1019
        s = requests.Session()
1020
        with mock.patch("os.environ", env):
1021
            settings = s.merge_environment_settings(
1022
                url=httpbin("get"), proxies={}, stream=False, verify=True, cert=None
1023
            )
1024
        assert settings["verify"] == expected
1025
1026
    def test_http_with_certificate(self, httpbin):
1027
        r = requests.get(httpbin(), cert=".")
1028
        assert r.status_code == 200
1029
1030
    @pytest.mark.skipif(
1031
        SNIMissingWarning is None,
1032
        reason="urllib3 2.0 removed that warning and errors out instead",
1033
    )
1034
    def test_https_warnings(self, nosan_server):
1035
        """warnings are emitted with requests.get"""
1036
        host, port, ca_bundle = nosan_server
1037
        if HAS_MODERN_SSL or HAS_PYOPENSSL:
1038
            warnings_expected = ("SubjectAltNameWarning",)
1039
        else:
1040
            warnings_expected = (
1041
                "SNIMissingWarning",
1042
                "InsecurePlatformWarning",
1043
                "SubjectAltNameWarning",
1044
            )
1045
1046
        with pytest.warns() as warning_records:
1047
            warnings.simplefilter("always")
1048
            requests.get(f"https://localhost:{port}/", verify=ca_bundle)
1049
1050
        warning_records = [
1051
            item
1052
            for item in warning_records
1053
            if item.category.__name__ != "ResourceWarning"
1054
        ]
1055
1056
        warnings_category = tuple(item.category.__name__ for item in warning_records)
1057
        assert warnings_category == warnings_expected
1058
1059
    def test_certificate_failure(self, httpbin_secure):
1060
        """
1061
        When underlying SSL problems occur, an SSLError is raised.
1062
        """
1063
        with pytest.raises(RequestsSSLError):
1064
            # Our local httpbin does not have a trusted CA, so this call will
1065
            # fail if we use our default trust bundle.
1066
            requests.get(httpbin_secure("status", "200"))
1067
1068
    def test_urlencoded_get_query_multivalued_param(self, httpbin):
1069
        r = requests.get(httpbin("get"), params={"test": ["foo", "baz"]})
1070
        assert r.status_code == 200
1071
        assert r.url == httpbin("get?test=foo&test=baz")
1072
1073
    def test_form_encoded_post_query_multivalued_element(self, httpbin):
1074
        r = requests.Request(
1075
            method="POST", url=httpbin("post"), data=dict(test=["foo", "baz"])
1076
        )
1077
        prep = r.prepare()
1078
        assert prep.body == "test=foo&test=baz"
1079
1080
    def test_different_encodings_dont_break_post(self, httpbin):
1081
        with open(__file__, "rb") as f:
1082
            r = requests.post(
1083
                httpbin("post"),
1084
                data={"stuff": json.dumps({"a": 123})},
1085
                params={"blah": "asdf1234"},
1086
                files={"file": ("test_requests.py", f)},
1087
            )
1088
        assert r.status_code == 200
1089
1090
    @pytest.mark.parametrize(
1091
        "data",
1092
        (
1093
            {"stuff": "ëlïxr"},
1094
            {"stuff": "ëlïxr".encode()},
1095
            {"stuff": "elixr"},
1096
            {"stuff": b"elixr"},
1097
        ),
1098
    )
1099
    def test_unicode_multipart_post(self, httpbin, data):
1100
        with open(__file__, "rb") as f:
1101
            r = requests.post(
1102
                httpbin("post"),
1103
                data=data,
1104
                files={"file": ("test_requests.py", f)},
1105
            )
1106
        assert r.status_code == 200
1107
1108
    def test_unicode_multipart_post_fieldnames(self, httpbin):
1109
        filename = os.path.splitext(__file__)[0] + ".py"
1110
        with open(filename, "rb") as f:
1111
            r = requests.Request(
1112
                method="POST",
1113
                url=httpbin("post"),
1114
                data={b"stuff": "elixr"},
1115
                files={"file": ("test_requests.py", f)},
1116
            )
1117
            prep = r.prepare()
1118
1119
        assert b'name="stuff"' in prep.body
1120
        assert b"name=\"b'stuff'\"" not in prep.body
1121
1122
    def test_unicode_method_name(self, httpbin):
1123
        with open(__file__, "rb") as f:
1124
            files = {"file": f}
1125
            r = requests.request(
1126
                method="POST",
1127
                url=httpbin("post"),
1128
                files=files,
1129
            )
1130
        assert r.status_code == 200
1131
1132
    def test_unicode_method_name_with_request_object(self, httpbin):
1133
        s = requests.Session()
1134
        with open(__file__, "rb") as f:
1135
            files = {"file": f}
1136
            req = requests.Request("POST", httpbin("post"), files=files)
1137
            prep = s.prepare_request(req)
1138
        assert isinstance(prep.method, builtin_str)
1139
        assert prep.method == "POST"
1140
1141
        resp = s.send(prep)
1142
        assert resp.status_code == 200
1143
1144
    def test_non_prepared_request_error(self):
1145
        s = requests.Session()
1146
        req = requests.Request("POST", "/")
1147
1148
        with pytest.raises(ValueError) as e:
1149
            s.send(req)
1150
        assert str(e.value) == "You can only send PreparedRequests."
1151
1152
    def test_custom_content_type(self, httpbin):
1153
        with open(__file__, "rb") as f1:
1154
            with open(__file__, "rb") as f2:
1155
                data = {"stuff": json.dumps({"a": 123})}
1156
                files = {
1157
                    "file1": ("test_requests.py", f1),
1158
                    "file2": ("test_requests", f2, "text/py-content-type"),
1159
                }
1160
                r = requests.post(httpbin("post"), data=data, files=files)
1161
        assert r.status_code == 200
1162
        assert b"text/py-content-type" in r.request.body
1163
1164
    def test_hook_receives_request_arguments(self, httpbin):
1165
        def hook(resp, **kwargs):
1166
            assert resp is not None
1167
            assert kwargs != {}
1168
1169
        s = requests.Session()
1170
        r = requests.Request("GET", httpbin(), hooks={"response": hook})
1171
        prep = s.prepare_request(r)
1172
        s.send(prep)
1173
1174
    def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
1175
        def hook(*args, **kwargs):
1176
            pass
1177
1178
        s = requests.Session()
1179
        s.hooks["response"].append(hook)
1180
        r = requests.Request("GET", httpbin())
1181
        prep = s.prepare_request(r)
1182
        assert prep.hooks["response"] != []
1183
        assert prep.hooks["response"] == [hook]
1184
1185
    def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
1186
        def hook1(*args, **kwargs):
1187
            pass
1188
1189
        def hook2(*args, **kwargs):
1190
            pass
1191
1192
        assert hook1 is not hook2
1193
        s = requests.Session()
1194
        s.hooks["response"].append(hook2)
1195
        r = requests.Request("GET", httpbin(), hooks={"response": [hook1]})
1196
        prep = s.prepare_request(r)
1197
        assert prep.hooks["response"] == [hook1]
1198
1199
    def test_prepared_request_hook(self, httpbin):
1200
        def hook(resp, **kwargs):
1201
            resp.hook_working = True
1202
            return resp
1203
1204
        req = requests.Request("GET", httpbin(), hooks={"response": hook})
1205
        prep = req.prepare()
1206
1207
        s = requests.Session()
1208
        s.proxies = getproxies()
1209
        resp = s.send(prep)
1210
1211
        assert hasattr(resp, "hook_working")
1212
1213
    def test_prepared_from_session(self, httpbin):
1214
        class DummyAuth(requests.auth.AuthBase):
1215
            def __call__(self, r):
1216
                r.headers["Dummy-Auth-Test"] = "dummy-auth-test-ok"
1217
                return r
1218
1219
        req = requests.Request("GET", httpbin("headers"))
1220
        assert not req.auth
1221
1222
        s = requests.Session()
1223
        s.auth = DummyAuth()
1224
1225
        prep = s.prepare_request(req)
1226
        resp = s.send(prep)
1227
1228
        assert resp.json()["headers"]["Dummy-Auth-Test"] == "dummy-auth-test-ok"
1229
1230
    def test_prepare_request_with_bytestring_url(self):
1231
        req = requests.Request("GET", b"https://httpbin.org/")
1232
        s = requests.Session()
1233
        prep = s.prepare_request(req)
1234
        assert prep.url == "https://httpbin.org/"
1235
1236
    def test_request_with_bytestring_host(self, httpbin):
1237
        s = requests.Session()
1238
        resp = s.request(
1239
            "GET",
1240
            httpbin("cookies/set?cookie=value"),
1241
            allow_redirects=False,
1242
            headers={"Host": b"httpbin.org"},
1243
        )
1244
        assert resp.cookies.get("cookie") == "value"
1245
1246
    def test_links(self):
1247
        r = requests.Response()
1248
        r.headers = {
1249
            "cache-control": "public, max-age=60, s-maxage=60",
1250
            "connection": "keep-alive",
1251
            "content-encoding": "gzip",
1252
            "content-type": "application/json; charset=utf-8",
1253
            "date": "Sat, 26 Jan 2013 16:47:56 GMT",
1254
            "etag": '"6ff6a73c0e446c1f61614769e3ceb778"',
1255
            "last-modified": "Sat, 26 Jan 2013 16:22:39 GMT",
1256
            "link": (
1257
                "<https://api.github.com/users/kennethreitz/repos?"
1258
                'page=2&per_page=10>; rel="next", <https://api.github.'
1259
                "com/users/kennethreitz/repos?page=7&per_page=10>; "
1260
                ' rel="last"'
1261
            ),
1262
            "server": "GitHub.com",
1263
            "status": "200 OK",
1264
            "vary": "Accept",
1265
            "x-content-type-options": "nosniff",
1266
            "x-github-media-type": "github.beta",
1267
            "x-ratelimit-limit": "60",
1268
            "x-ratelimit-remaining": "57",
1269
        }
1270
        assert r.links["next"]["rel"] == "next"
1271
1272
    def test_cookie_parameters(self):
1273
        key = "some_cookie"
1274
        value = "some_value"
1275
        secure = True
1276
        domain = "test.com"
1277
        rest = {"HttpOnly": True}
1278
1279
        jar = requests.cookies.RequestsCookieJar()
1280
        jar.set(key, value, secure=secure, domain=domain, rest=rest)
1281
1282
        assert len(jar) == 1
1283
        assert "some_cookie" in jar
1284
1285
        cookie = list(jar)[0]
1286
        assert cookie.secure == secure
1287
        assert cookie.domain == domain
1288
        assert cookie._rest["HttpOnly"] == rest["HttpOnly"]
1289
1290
    def test_cookie_as_dict_keeps_len(self):
1291
        key = "some_cookie"
1292
        value = "some_value"
1293
1294
        key1 = "some_cookie1"
1295
        value1 = "some_value1"
1296
1297
        jar = requests.cookies.RequestsCookieJar()
1298
        jar.set(key, value)
1299
        jar.set(key1, value1)
1300
1301
        d1 = dict(jar)
1302
        d2 = dict(jar.iteritems())
1303
        d3 = dict(jar.items())
1304
1305
        assert len(jar) == 2
1306
        assert len(d1) == 2
1307
        assert len(d2) == 2
1308
        assert len(d3) == 2
1309
1310
    def test_cookie_as_dict_keeps_items(self):
1311
        key = "some_cookie"
1312
        value = "some_value"
1313
1314
        key1 = "some_cookie1"
1315
        value1 = "some_value1"
1316
1317
        jar = requests.cookies.RequestsCookieJar()
1318
        jar.set(key, value)
1319
        jar.set(key1, value1)
1320
1321
        d1 = dict(jar)
1322
        d2 = dict(jar.iteritems())
1323
        d3 = dict(jar.items())
1324
1325
        assert d1["some_cookie"] == "some_value"
1326
        assert d2["some_cookie"] == "some_value"
1327
        assert d3["some_cookie1"] == "some_value1"
1328
1329
    def test_cookie_as_dict_keys(self):
1330
        key = "some_cookie"
1331
        value = "some_value"
1332
1333
        key1 = "some_cookie1"
1334
        value1 = "some_value1"
1335
1336
        jar = requests.cookies.RequestsCookieJar()
1337
        jar.set(key, value)
1338
        jar.set(key1, value1)
1339
1340
        keys = jar.keys()
1341
        assert keys == list(keys)
1342
        # make sure one can use keys multiple times
1343
        assert list(keys) == list(keys)
1344
1345
    def test_cookie_as_dict_values(self):
1346
        key = "some_cookie"
1347
        value = "some_value"
1348
1349
        key1 = "some_cookie1"
1350
        value1 = "some_value1"
1351
1352
        jar = requests.cookies.RequestsCookieJar()
1353
        jar.set(key, value)
1354
        jar.set(key1, value1)
1355
1356
        values = jar.values()
1357
        assert values == list(values)
1358
        # make sure one can use values multiple times
1359
        assert list(values) == list(values)
1360
1361
    def test_cookie_as_dict_items(self):
1362
        key = "some_cookie"
1363
        value = "some_value"
1364
1365
        key1 = "some_cookie1"
1366
        value1 = "some_value1"
1367
1368
        jar = requests.cookies.RequestsCookieJar()
1369
        jar.set(key, value)
1370
        jar.set(key1, value1)
1371
1372
        items = jar.items()
1373
        assert items == list(items)
1374
        # make sure one can use items multiple times
1375
        assert list(items) == list(items)
1376
1377
    def test_cookie_duplicate_names_different_domains(self):
1378
        key = "some_cookie"
1379
        value = "some_value"
1380
        domain1 = "test1.com"
1381
        domain2 = "test2.com"
1382
1383
        jar = requests.cookies.RequestsCookieJar()
1384
        jar.set(key, value, domain=domain1)
1385
        jar.set(key, value, domain=domain2)
1386
        assert key in jar
1387
        items = jar.items()
1388
        assert len(items) == 2
1389
1390
        # Verify that CookieConflictError is raised if domain is not specified
1391
        with pytest.raises(requests.cookies.CookieConflictError):
1392
            jar.get(key)
1393
1394
        # Verify that CookieConflictError is not raised if domain is specified
1395
        cookie = jar.get(key, domain=domain1)
1396
        assert cookie == value
1397
1398
    def test_cookie_duplicate_names_raises_cookie_conflict_error(self):
1399
        key = "some_cookie"
1400
        value = "some_value"
1401
        path = "some_path"
1402
1403
        jar = requests.cookies.RequestsCookieJar()
1404
        jar.set(key, value, path=path)
1405
        jar.set(key, value)
1406
        with pytest.raises(requests.cookies.CookieConflictError):
1407
            jar.get(key)
1408
1409
    def test_cookie_policy_copy(self):
1410
        class MyCookiePolicy(cookielib.DefaultCookiePolicy):
1411
            pass
1412
1413
        jar = requests.cookies.RequestsCookieJar()
1414
        jar.set_policy(MyCookiePolicy())
1415
        assert isinstance(jar.copy().get_policy(), MyCookiePolicy)
1416
1417
    def test_time_elapsed_blank(self, httpbin):
1418
        r = requests.get(httpbin("get"))
1419
        td = r.elapsed
1420
        total_seconds = (
1421
            td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6
1422
        ) / 10**6
1423
        assert total_seconds > 0.0
1424
1425
    def test_empty_response_has_content_none(self):
1426
        r = requests.Response()
1427
        assert r.content is None
1428
1429
    def test_response_is_iterable(self):
1430
        r = requests.Response()
1431
        io = StringIO.StringIO("abc")
1432
        read_ = io.read
1433
1434
        def read_mock(amt, decode_content=None):
1435
            return read_(amt)
1436
1437
        setattr(io, "read", read_mock)
1438
        r.raw = io
1439
        assert next(iter(r))
1440
        io.close()
1441
1442
    def test_response_decode_unicode(self):
1443
        """When called with decode_unicode, Response.iter_content should always
1444
        return unicode.
1445
        """
1446
        r = requests.Response()
1447
        r._content_consumed = True
1448
        r._content = b"the content"
1449
        r.encoding = "ascii"
1450
1451
        chunks = r.iter_content(decode_unicode=True)
1452
        assert all(isinstance(chunk, str) for chunk in chunks)
1453
1454
        # also for streaming
1455
        r = requests.Response()
1456
        r.raw = io.BytesIO(b"the content")
1457
        r.encoding = "ascii"
1458
        chunks = r.iter_content(decode_unicode=True)
1459
        assert all(isinstance(chunk, str) for chunk in chunks)
1460
1461
    def test_response_reason_unicode(self):
1462
        # check for unicode HTTP status
1463
        r = requests.Response()
1464
        r.url = "unicode URL"
1465
        r.reason = "Komponenttia ei löydy".encode()
1466
        r.status_code = 404
1467
        r.encoding = None
1468
        assert not r.ok  # old behaviour - crashes here
1469
1470
    def test_response_reason_unicode_fallback(self):
1471
        # check raise_status falls back to ISO-8859-1
1472
        r = requests.Response()
1473
        r.url = "some url"
1474
        reason = "Komponenttia ei löydy"
1475
        r.reason = reason.encode("latin-1")
1476
        r.status_code = 500
1477
        r.encoding = None
1478
        with pytest.raises(requests.exceptions.HTTPError) as e:
1479
            r.raise_for_status()
1480
        assert reason in e.value.args[0]
1481
1482
    def test_response_chunk_size_type(self):
1483
        """Ensure that chunk_size is passed as None or an integer, otherwise
1484
        raise a TypeError.
1485
        """
1486
        r = requests.Response()
1487
        r.raw = io.BytesIO(b"the content")
1488
        chunks = r.iter_content(1)
1489
        assert all(len(chunk) == 1 for chunk in chunks)
1490
1491
        r = requests.Response()
1492
        r.raw = io.BytesIO(b"the content")
1493
        chunks = r.iter_content(None)
1494
        assert list(chunks) == [b"the content"]
1495
1496
        r = requests.Response()
1497
        r.raw = io.BytesIO(b"the content")
1498
        with pytest.raises(TypeError):
1499
            chunks = r.iter_content("1024")
1500
1501
    @pytest.mark.parametrize(
1502
        "exception, args, expected",
1503
        (
1504
            (urllib3.exceptions.ProtocolError, tuple(), ChunkedEncodingError),
1505
            (urllib3.exceptions.DecodeError, tuple(), ContentDecodingError),
1506
            (urllib3.exceptions.ReadTimeoutError, (None, "", ""), ConnectionError),
1507
            (urllib3.exceptions.SSLError, tuple(), RequestsSSLError),
1508
        ),
1509
    )
1510
    def test_iter_content_wraps_exceptions(self, httpbin, exception, args, expected):
1511
        r = requests.Response()
1512
        r.raw = mock.Mock()
1513
        # ReadTimeoutError can't be initialized by mock
1514
        # so we'll manually create the instance with args
1515
        r.raw.stream.side_effect = exception(*args)
1516
1517
        with pytest.raises(expected):
1518
            next(r.iter_content(1024))
1519
1520
    def test_request_and_response_are_pickleable(self, httpbin):
1521
        r = requests.get(httpbin("get"))
1522
1523
        # verify we can pickle the original request
1524
        assert pickle.loads(pickle.dumps(r.request))
1525
1526
        # verify we can pickle the response and that we have access to
1527
        # the original request.
1528
        pr = pickle.loads(pickle.dumps(r))
1529
        assert r.request.url == pr.request.url
1530
        assert r.request.headers == pr.request.headers
1531
1532
    def test_prepared_request_is_pickleable(self, httpbin):
1533
        p = requests.Request("GET", httpbin("get")).prepare()
1534
1535
        # Verify PreparedRequest can be pickled and unpickled
1536
        r = pickle.loads(pickle.dumps(p))
1537
        assert r.url == p.url
1538
        assert r.headers == p.headers
1539
        assert r.body == p.body
1540
1541
        # Verify unpickled PreparedRequest sends properly
1542
        s = requests.Session()
1543
        resp = s.send(r)
1544
        assert resp.status_code == 200
1545
1546
    def test_prepared_request_with_file_is_pickleable(self, httpbin):
1547
        with open(__file__, "rb") as f:
1548
            r = requests.Request("POST", httpbin("post"), files={"file": f})
1549
            p = r.prepare()
1550
1551
        # Verify PreparedRequest can be pickled and unpickled
1552
        r = pickle.loads(pickle.dumps(p))
1553
        assert r.url == p.url
1554
        assert r.headers == p.headers
1555
        assert r.body == p.body
1556
1557
        # Verify unpickled PreparedRequest sends properly
1558
        s = requests.Session()
1559
        resp = s.send(r)
1560
        assert resp.status_code == 200
1561
1562
    def test_prepared_request_with_hook_is_pickleable(self, httpbin):
1563
        r = requests.Request("GET", httpbin("get"), hooks=default_hooks())
1564
        p = r.prepare()
1565
1566
        # Verify PreparedRequest can be pickled
1567
        r = pickle.loads(pickle.dumps(p))
1568
        assert r.url == p.url
1569
        assert r.headers == p.headers
1570
        assert r.body == p.body
1571
        assert r.hooks == p.hooks
1572
1573
        # Verify unpickled PreparedRequest sends properly
1574
        s = requests.Session()
1575
        resp = s.send(r)
1576
        assert resp.status_code == 200
1577
1578
    def test_cannot_send_unprepared_requests(self, httpbin):
1579
        r = requests.Request(url=httpbin())
1580
        with pytest.raises(ValueError):
1581
            requests.Session().send(r)
1582
1583
    def test_http_error(self):
1584
        error = requests.exceptions.HTTPError()
1585
        assert not error.response
1586
        response = requests.Response()
1587
        error = requests.exceptions.HTTPError(response=response)
1588
        assert error.response == response
1589
        error = requests.exceptions.HTTPError("message", response=response)
1590
        assert str(error) == "message"
1591
        assert error.response == response
1592
1593
    def test_session_pickling(self, httpbin):
1594
        r = requests.Request("GET", httpbin("get"))
1595
        s = requests.Session()
1596
1597
        s = pickle.loads(pickle.dumps(s))
1598
        s.proxies = getproxies()
1599
1600
        r = s.send(r.prepare())
1601
        assert r.status_code == 200
1602
1603
    def test_fixes_1329(self, httpbin):
1604
        """Ensure that header updates are done case-insensitively."""
1605
        s = requests.Session()
1606
        s.headers.update({"ACCEPT": "BOGUS"})
1607
        s.headers.update({"accept": "application/json"})
1608
        r = s.get(httpbin("get"))
1609
        headers = r.request.headers
1610
        assert headers["accept"] == "application/json"
1611
        assert headers["Accept"] == "application/json"
1612
        assert headers["ACCEPT"] == "application/json"
1613
1614
    def test_uppercase_scheme_redirect(self, httpbin):
1615
        parts = urlparse(httpbin("html"))
1616
        url = "HTTP://" + parts.netloc + parts.path
1617
        r = requests.get(httpbin("redirect-to"), params={"url": url})
1618
        assert r.status_code == 200
1619
        assert r.url.lower() == url.lower()
1620
1621
    def test_transport_adapter_ordering(self):
1622
        s = requests.Session()
1623
        order = ["https://", "http://"]
1624
        assert order == list(s.adapters)
1625
        s.mount("http://git", HTTPAdapter())
1626
        s.mount("http://github", HTTPAdapter())
1627
        s.mount("http://github.com", HTTPAdapter())
1628
        s.mount("http://github.com/about/", HTTPAdapter())
1629
        order = [
1630
            "http://github.com/about/",
1631
            "http://github.com",
1632
            "http://github",
1633
            "http://git",
1634
            "https://",
1635
            "http://",
1636
        ]
1637
        assert order == list(s.adapters)
1638
        s.mount("http://gittip", HTTPAdapter())
1639
        s.mount("http://gittip.com", HTTPAdapter())
1640
        s.mount("http://gittip.com/about/", HTTPAdapter())
1641
        order = [
1642
            "http://github.com/about/",
1643
            "http://gittip.com/about/",
1644
            "http://github.com",
1645
            "http://gittip.com",
1646
            "http://github",
1647
            "http://gittip",
1648
            "http://git",
1649
            "https://",
1650
            "http://",
1651
        ]
1652
        assert order == list(s.adapters)
1653
        s2 = requests.Session()
1654
        s2.adapters = {"http://": HTTPAdapter()}
1655
        s2.mount("https://", HTTPAdapter())
1656
        assert "http://" in s2.adapters
1657
        assert "https://" in s2.adapters
1658
1659
    def test_session_get_adapter_prefix_matching(self):
1660
        prefix = "https://example.com"
1661
        more_specific_prefix = prefix + "/some/path"
1662
1663
        url_matching_only_prefix = prefix + "/another/path"
1664
        url_matching_more_specific_prefix = more_specific_prefix + "/longer/path"
1665
        url_not_matching_prefix = "https://another.example.com/"
1666
1667
        s = requests.Session()
1668
        prefix_adapter = HTTPAdapter()
1669
        more_specific_prefix_adapter = HTTPAdapter()
1670
        s.mount(prefix, prefix_adapter)
1671
        s.mount(more_specific_prefix, more_specific_prefix_adapter)
1672
1673
        assert s.get_adapter(url_matching_only_prefix) is prefix_adapter
1674
        assert (
1675
            s.get_adapter(url_matching_more_specific_prefix)
1676
            is more_specific_prefix_adapter
1677
        )
1678
        assert s.get_adapter(url_not_matching_prefix) not in (
1679
            prefix_adapter,
1680
            more_specific_prefix_adapter,
1681
        )
1682
1683
    def test_session_get_adapter_prefix_matching_mixed_case(self):
1684
        mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix"
1685
        url_matching_prefix = mixed_case_prefix + "/full_url"
1686
1687
        s = requests.Session()
1688
        my_adapter = HTTPAdapter()
1689
        s.mount(mixed_case_prefix, my_adapter)
1690
1691
        assert s.get_adapter(url_matching_prefix) is my_adapter
1692
1693
    def test_session_get_adapter_prefix_matching_is_case_insensitive(self):
1694
        mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix"
1695
        url_matching_prefix_with_different_case = (
1696
            "HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url"
1697
        )
1698
1699
        s = requests.Session()
1700
        my_adapter = HTTPAdapter()
1701
        s.mount(mixed_case_prefix, my_adapter)
1702
1703
        assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter
1704
1705
    def test_session_get_adapter_prefix_with_trailing_slash(self):
1706
        # from issue #6935
1707
        prefix = "https://example.com/"  # trailing slash
1708
        url_matching_prefix = "https://example.com/some/path"
1709
        url_not_matching_prefix = "https://example.com.other.com/some/path"
1710
1711
        s = requests.Session()
1712
        adapter = HTTPAdapter()
1713
        s.mount(prefix, adapter)
1714
1715
        assert s.get_adapter(url_matching_prefix) is adapter
1716
        assert s.get_adapter(url_not_matching_prefix) is not adapter
1717
1718
    def test_session_get_adapter_prefix_without_trailing_slash(self):
1719
        # from issue #6935
1720
        prefix = "https://example.com"  # no trailing slash
1721
        url_matching_prefix = "https://example.com/some/path"
1722
        url_extended_hostname = "https://example.com.other.com/some/path"
1723
1724
        s = requests.Session()
1725
        adapter = HTTPAdapter()
1726
        s.mount(prefix, adapter)
1727
1728
        assert s.get_adapter(url_matching_prefix) is adapter
1729
        assert s.get_adapter(url_extended_hostname) is adapter
1730
1731
    def test_header_remove_is_case_insensitive(self, httpbin):
1732
        # From issue #1321
1733
        s = requests.Session()
1734
        s.headers["foo"] = "bar"
1735
        r = s.get(httpbin("get"), headers={"FOO": None})
1736
        assert "foo" not in r.request.headers
1737
1738
    def test_params_are_merged_case_sensitive(self, httpbin):
1739
        s = requests.Session()
1740
        s.params["foo"] = "bar"
1741
        r = s.get(httpbin("get"), params={"FOO": "bar"})
1742
        assert r.json()["args"] == {"foo": "bar", "FOO": "bar"}
1743
1744
    def test_long_authinfo_in_url(self):
1745
        url = "http://{}:{}@{}:9000/path?query#frag".format(
1746
            "E8A3BE87-9E3F-4620-8858-95478E385B5B",
1747
            "EA770032-DA4D-4D84-8CE9-29C6D910BF1E",
1748
            "exactly-------------sixty-----------three------------characters",
1749
        )
1750
        r = requests.Request("GET", url).prepare()
1751
        assert r.url == url
1752
1753
    def test_header_keys_are_native(self, httpbin):
1754
        headers = {"unicode": "blah", b"byte": "blah"}
1755
        r = requests.Request("GET", httpbin("get"), headers=headers)
1756
        p = r.prepare()
1757
1758
        # This is testing that they are builtin strings. A bit weird, but there
1759
        # we go.
1760
        assert "unicode" in p.headers.keys()
1761
        assert "byte" in p.headers.keys()
1762
1763
    def test_header_validation(self, httpbin):
1764
        """Ensure prepare_headers regex isn't flagging valid header contents."""
1765
        valid_headers = {
1766
            "foo": "bar baz qux",
1767
            "bar": b"fbbq",
1768
            "baz": "",
1769
            "qux": "1",
1770
        }
1771
        r = requests.get(httpbin("get"), headers=valid_headers)
1772
        for key in valid_headers.keys():
1773
            assert valid_headers[key] == r.request.headers[key]
1774
1775
    @pytest.mark.parametrize(
1776
        "invalid_header, key",
1777
        (
1778
            ({"foo": 3}, "foo"),
1779
            ({"bar": {"foo": "bar"}}, "bar"),
1780
            ({"baz": ["foo", "bar"]}, "baz"),
1781
        ),
1782
    )
1783
    def test_header_value_not_str(self, httpbin, invalid_header, key):
1784
        """Ensure the header value is of type string or bytes as
1785
        per discussion in GH issue #3386
1786
        """
1787
        with pytest.raises(InvalidHeader) as excinfo:
1788
            requests.get(httpbin("get"), headers=invalid_header)
1789
        assert key in str(excinfo.value)
1790
1791
    @pytest.mark.parametrize(
1792
        "invalid_header",
1793
        (
1794
            {"foo": "bar\r\nbaz: qux"},
1795
            {"foo": "bar\n\rbaz: qux"},
1796
            {"foo": "bar\nbaz: qux"},
1797
            {"foo": "bar\rbaz: qux"},
1798
            {"fo\ro": "bar"},
1799
            {"fo\r\no": "bar"},
1800
            {"fo\n\ro": "bar"},
1801
            {"fo\no": "bar"},
1802
            {"foo": "bar\n"},
1803
            {"foo\n": "bar"},
1804
            {"foo": "bar\r\n"},
1805
            {"foo": "\n"},
1806
            {"foo": "\r\n"},
1807
        ),
1808
    )
1809
    def test_header_no_return_chars(self, httpbin, invalid_header):
1810
        """Ensure that a header containing return character sequences raise an
1811
        exception. Otherwise, multiple headers are created from single string.
1812
        """
1813
        with pytest.raises(InvalidHeader):
1814
            requests.get(httpbin("get"), headers=invalid_header)
1815
1816
    @pytest.mark.parametrize(
1817
        "invalid_header",
1818
        (
1819
            {" foo": "bar"},
1820
            {"\tfoo": "bar"},
1821
            {"    foo": "bar"},
1822
            {"foo": " bar"},
1823
            {"foo": "    bar"},
1824
            {"foo": "\tbar"},
1825
            {" ": "bar"},
1826
        ),
1827
    )
1828
    def test_header_no_leading_space(self, httpbin, invalid_header):
1829
        """Ensure headers containing leading whitespace raise
1830
        InvalidHeader Error before sending.
1831
        """
1832
        with pytest.raises(InvalidHeader):
1833
            requests.get(httpbin("get"), headers=invalid_header)
1834
1835
    def test_header_with_subclass_types(self, httpbin):
1836
        """If the subclasses does not behave *exactly* like
1837
        the base bytes/str classes, this is not supported.
1838
        This test is for backwards compatibility.
1839
        """
1840
1841
        class MyString(str):
1842
            pass
1843
1844
        class MyBytes(bytes):
1845
            pass
1846
1847
        r_str = requests.get(httpbin("get"), headers={MyString("x-custom"): "myheader"})
1848
        assert r_str.request.headers["x-custom"] == "myheader"
1849
1850
        r_bytes = requests.get(
1851
            httpbin("get"), headers={MyBytes(b"x-custom"): b"myheader"}
1852
        )
1853
        assert r_bytes.request.headers["x-custom"] == b"myheader"
1854
1855
        r_mixed = requests.get(
1856
            httpbin("get"), headers={MyString("x-custom"): MyBytes(b"myheader")}
1857
        )
1858
        assert r_mixed.request.headers["x-custom"] == b"myheader"
1859
1860
    @pytest.mark.parametrize("files", ("foo", b"foo", bytearray(b"foo")))
1861
    def test_can_send_objects_with_files(self, httpbin, files):
1862
        data = {"a": "this is a string"}
1863
        files = {"b": files}
1864
        r = requests.Request("POST", httpbin("post"), data=data, files=files)
1865
        p = r.prepare()
1866
        assert "multipart/form-data" in p.headers["Content-Type"]
1867
1868
    def test_can_send_file_object_with_non_string_filename(self, httpbin):
1869
        f = io.BytesIO()
1870
        f.name = 2
1871
        r = requests.Request("POST", httpbin("post"), files={"f": f})
1872
        p = r.prepare()
1873
1874
        assert "multipart/form-data" in p.headers["Content-Type"]
1875
1876
    def test_autoset_header_values_are_native(self, httpbin):
1877
        data = "this is a string"
1878
        length = "16"
1879
        req = requests.Request("POST", httpbin("post"), data=data)
1880
        p = req.prepare()
1881
1882
        assert p.headers["Content-Length"] == length
1883
1884
    def test_nonhttp_schemes_dont_check_URLs(self):
1885
        test_urls = (
1886
            "data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==",
1887
            "file:///etc/passwd",
1888
            "magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431",
1889
        )
1890
        for test_url in test_urls:
1891
            req = requests.Request("GET", test_url)
1892
            preq = req.prepare()
1893
            assert test_url == preq.url
1894
1895
    def test_auth_is_stripped_on_http_downgrade(
1896
        self, httpbin, httpbin_secure, httpbin_ca_bundle
1897
    ):
1898
        r = requests.get(
1899
            httpbin_secure("redirect-to"),
1900
            params={"url": httpbin("get")},
1901
            auth=("user", "pass"),
1902
            verify=httpbin_ca_bundle,
1903
        )
1904
        assert r.history[0].request.headers["Authorization"]
1905
        assert "Authorization" not in r.request.headers
1906
1907
    def test_auth_is_retained_for_redirect_on_host(self, httpbin):
1908
        r = requests.get(httpbin("redirect/1"), auth=("user", "pass"))
1909
        h1 = r.history[0].request.headers["Authorization"]
1910
        h2 = r.request.headers["Authorization"]
1911
1912
        assert h1 == h2
1913
1914
    def test_should_strip_auth_host_change(self):
1915
        s = requests.Session()
1916
        assert s.should_strip_auth(
1917
            "http://example.com/foo", "http://another.example.com/"
1918
        )
1919
1920
    def test_should_strip_auth_http_downgrade(self):
1921
        s = requests.Session()
1922
        assert s.should_strip_auth("https://example.com/foo", "http://example.com/bar")
1923
1924
    def test_should_strip_auth_https_upgrade(self):
1925
        s = requests.Session()
1926
        assert not s.should_strip_auth(
1927
            "http://example.com/foo", "https://example.com/bar"
1928
        )
1929
        assert not s.should_strip_auth(
1930
            "http://example.com:80/foo", "https://example.com/bar"
1931
        )
1932
        assert not s.should_strip_auth(
1933
            "http://example.com/foo", "https://example.com:443/bar"
1934
        )
1935
        # Non-standard ports should trigger stripping
1936
        assert s.should_strip_auth(
1937
            "http://example.com:8080/foo", "https://example.com/bar"
1938
        )
1939
        assert s.should_strip_auth(
1940
            "http://example.com/foo", "https://example.com:8443/bar"
1941
        )
1942
1943
    def test_should_strip_auth_port_change(self):
1944
        s = requests.Session()
1945
        assert s.should_strip_auth(
1946
            "http://example.com:1234/foo", "https://example.com:4321/bar"
1947
        )
1948
1949
    @pytest.mark.parametrize(
1950
        "old_uri, new_uri",
1951
        (
1952
            ("https://example.com:443/foo", "https://example.com/bar"),
1953
            ("http://example.com:80/foo", "http://example.com/bar"),
1954
            ("https://example.com/foo", "https://example.com:443/bar"),
1955
            ("http://example.com/foo", "http://example.com:80/bar"),
1956
        ),
1957
    )
1958
    def test_should_strip_auth_default_port(self, old_uri, new_uri):
1959
        s = requests.Session()
1960
        assert not s.should_strip_auth(old_uri, new_uri)
1961
1962
    def test_manual_redirect_with_partial_body_read(self, httpbin):
1963
        s = requests.Session()
1964
        r1 = s.get(httpbin("redirect/2"), allow_redirects=False, stream=True)
1965
        assert r1.is_redirect
1966
        rg = s.resolve_redirects(r1, r1.request, stream=True)
1967
1968
        # read only the first eight bytes of the response body,
1969
        # then follow the redirect
1970
        r1.iter_content(8)
1971
        r2 = next(rg)
1972
        assert r2.is_redirect
1973
1974
        # read all of the response via iter_content,
1975
        # then follow the redirect
1976
        for _ in r2.iter_content():
1977
            pass
1978
        r3 = next(rg)
1979
        assert not r3.is_redirect
1980
1981
    def test_prepare_body_position_non_stream(self):
1982
        data = b"the data"
1983
        prep = requests.Request("GET", "http://example.com", data=data).prepare()
1984
        assert prep._body_position is None
1985
1986
    def test_rewind_body(self):
1987
        data = io.BytesIO(b"the data")
1988
        prep = requests.Request("GET", "http://example.com", data=data).prepare()
1989
        assert prep._body_position == 0
1990
        assert prep.body.read() == b"the data"
1991
1992
        # the data has all been read
1993
        assert prep.body.read() == b""
1994
1995
        # rewind it back
1996
        requests.utils.rewind_body(prep)
1997
        assert prep.body.read() == b"the data"
1998
1999
    def test_rewind_partially_read_body(self):
2000
        data = io.BytesIO(b"the data")
2001
        data.read(4)  # read some data
2002
        prep = requests.Request("GET", "http://example.com", data=data).prepare()
2003
        assert prep._body_position == 4
2004
        assert prep.body.read() == b"data"
2005
2006
        # the data has all been read
2007
        assert prep.body.read() == b""
2008
2009
        # rewind it back
2010
        requests.utils.rewind_body(prep)
2011
        assert prep.body.read() == b"data"
2012
2013
    def test_rewind_body_no_seek(self):
2014
        class BadFileObj:
2015
            def __init__(self, data):
2016
                self.data = data
2017
2018
            def tell(self):
2019
                return 0
2020
2021
            def __iter__(self):
2022
                return
2023
2024
        data = BadFileObj("the data")
2025
        prep = requests.Request("GET", "http://example.com", data=data).prepare()
2026
        assert prep._body_position == 0
2027
2028
        with pytest.raises(UnrewindableBodyError) as e:
2029
            requests.utils.rewind_body(prep)
2030
2031
        assert "Unable to rewind request body" in str(e)
2032
2033
    def test_rewind_body_failed_seek(self):
2034
        class BadFileObj:
2035
            def __init__(self, data):
2036
                self.data = data
2037
2038
            def tell(self):
2039
                return 0
2040
2041
            def seek(self, pos, whence=0):
2042
                raise OSError()
2043
2044
            def __iter__(self):
2045
                return
2046
2047
        data = BadFileObj("the data")
2048
        prep = requests.Request("GET", "http://example.com", data=data).prepare()
2049
        assert prep._body_position == 0
2050
2051
        with pytest.raises(UnrewindableBodyError) as e:
2052
            requests.utils.rewind_body(prep)
2053
2054
        assert "error occurred when rewinding request body" in str(e)
2055
2056
    def test_rewind_body_failed_tell(self):
2057
        class BadFileObj:
2058
            def __init__(self, data):
2059
                self.data = data
2060
2061
            def tell(self):
2062
                raise OSError()
2063
2064
            def __iter__(self):
2065
                return
2066
2067
        data = BadFileObj("the data")
2068
        prep = requests.Request("GET", "http://example.com", data=data).prepare()
2069
        assert prep._body_position is not None
2070
2071
        with pytest.raises(UnrewindableBodyError) as e:
2072
            requests.utils.rewind_body(prep)
2073
2074
        assert "Unable to rewind request body" in str(e)
2075
2076
    def test_getattr_proxy_stream_follows_redirect(self, httpbin):
2077
        """Ensure stream wrappers that don't implement __iter__ directly are still detected."""
2078
2079
        class AttrProxy:
2080
            def __init__(self):
2081
                self._file = io.BytesIO(b"data")
2082
2083
            def __getattr__(self, name):
2084
                return getattr(self._file, name)
2085
2086
        r = requests.post(
2087
            httpbin("redirect-to?url=/post&status_code=307"), data=AttrProxy()
2088
        )
2089
        assert r.json()["data"] == "data"
2090
2091
    def _patch_adapter_gzipped_redirect(self, session, url):
2092
        adapter = session.get_adapter(url=url)
2093
        org_build_response = adapter.build_response
2094
        self._patched_response = False
2095
2096
        def build_response(*args, **kwargs):
2097
            resp = org_build_response(*args, **kwargs)
2098
            if not self._patched_response:
2099
                resp.raw.headers["content-encoding"] = "gzip"
2100
                self._patched_response = True
2101
            return resp
2102
2103
        adapter.build_response = build_response
2104
2105
    def test_redirect_with_wrong_gzipped_header(self, httpbin):
2106
        s = requests.Session()
2107
        url = httpbin("redirect/1")
2108
        self._patch_adapter_gzipped_redirect(s, url)
2109
        s.get(url)
2110
2111
    @pytest.mark.parametrize(
2112
        "username, password, auth_str",
2113
        (
2114
            ("test", "test", "Basic dGVzdDp0ZXN0"),
2115
            (
2116
                "имя".encode(),
2117
                "пароль".encode(),
2118
                "Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA==",
2119
            ),
2120
        ),
2121
    )
2122
    def test_basic_auth_str_is_always_native(self, username, password, auth_str):
2123
        s = _basic_auth_str(username, password)
2124
        assert isinstance(s, builtin_str)
2125
        assert s == auth_str
2126
2127
    def test_requests_history_is_saved(self, httpbin):
2128
        r = requests.get(httpbin("redirect/5"))
2129
        total = r.history[-1].history
2130
        i = 0
2131
        for item in r.history:
2132
            assert item.history == total[0:i]
2133
            i += 1
2134
2135
    def test_json_param_post_content_type_works(self, httpbin):
2136
        r = requests.post(httpbin("post"), json={"life": 42})
2137
        assert r.status_code == 200
2138
        assert "application/json" in r.request.headers["Content-Type"]
2139
        assert {"life": 42} == r.json()["json"]
2140
2141
    def test_json_param_post_should_not_override_data_param(self, httpbin):
2142
        r = requests.Request(
2143
            method="POST",
2144
            url=httpbin("post"),
2145
            data={"stuff": "elixr"},
2146
            json={"music": "flute"},
2147
        )
2148
        prep = r.prepare()
2149
        assert "stuff=elixr" == prep.body
2150
2151
    def test_response_iter_lines(self, httpbin):
2152
        r = requests.get(httpbin("stream/4"), stream=True)
2153
        assert r.status_code == 200
2154
2155
        it = r.iter_lines()
2156
        next(it)
2157
        assert len(list(it)) == 3
2158
2159
    def test_response_context_manager(self, httpbin):
2160
        with requests.get(httpbin("stream/4"), stream=True) as response:
2161
            assert isinstance(response, requests.Response)
2162
2163
        assert response.raw.closed
2164
2165
    def test_unconsumed_session_response_closes_connection(self, httpbin):
2166
        s = requests.session()
2167
2168
        with contextlib.closing(s.get(httpbin("stream/4"), stream=True)) as response:
2169
            pass
2170
2171
        assert response._content_consumed is False
2172
        assert response.raw.closed
2173
2174
    @pytest.mark.xfail
2175
    def test_response_iter_lines_reentrant(self, httpbin):
2176
        """Response.iter_lines() is not reentrant safe"""
2177
        r = requests.get(httpbin("stream/4"), stream=True)
2178
        assert r.status_code == 200
2179
2180
        next(r.iter_lines())
2181
        assert len(list(r.iter_lines())) == 3
2182
2183
    def test_session_close_proxy_clear(self):
2184
        proxies = {
2185
            "one": mock.Mock(),
2186
            "two": mock.Mock(),
2187
        }
2188
        session = requests.Session()
2189
        with mock.patch.dict(session.adapters["http://"].proxy_manager, proxies):
2190
            session.close()
2191
            proxies["one"].clear.assert_called_once_with()
2192
            proxies["two"].clear.assert_called_once_with()
2193
2194
    def test_proxy_auth(self):
2195
        adapter = HTTPAdapter()
2196
        headers = adapter.proxy_headers("http://user:pass@httpbin.org")
2197
        assert headers == {"Proxy-Authorization": "Basic dXNlcjpwYXNz"}
2198
2199
    def test_proxy_auth_empty_pass(self):
2200
        adapter = HTTPAdapter()
2201
        headers = adapter.proxy_headers("http://user:@httpbin.org")
2202
        assert headers == {"Proxy-Authorization": "Basic dXNlcjo="}
2203
2204
    def test_response_json_when_content_is_None(self, httpbin):
2205
        r = requests.get(httpbin("/status/204"))
2206
        # Make sure r.content is None
2207
        r.status_code = 0
2208
        r._content = False
2209
        r._content_consumed = False
2210
2211
        assert r.content is None
2212
        with pytest.raises(ValueError):
2213
            r.json()
2214
2215
    def test_response_without_release_conn(self):
2216
        """Test `close` call for non-urllib3-like raw objects.
2217
        Should work when `release_conn` attr doesn't exist on `response.raw`.
2218
        """
2219
        resp = requests.Response()
2220
        resp.raw = StringIO.StringIO("test")
2221
        assert not resp.raw.closed
2222
        resp.close()
2223
        assert resp.raw.closed
2224
2225
    def test_empty_stream_with_auth_does_not_set_content_length_header(self, httpbin):
2226
        """Ensure that a byte stream with size 0 will not set both a Content-Length
2227
        and Transfer-Encoding header.
2228
        """
2229
        auth = ("user", "pass")
2230
        url = httpbin("post")
2231
        file_obj = io.BytesIO(b"")
2232
        r = requests.Request("POST", url, auth=auth, data=file_obj)
2233
        prepared_request = r.prepare()
2234
        assert "Transfer-Encoding" in prepared_request.headers
2235
        assert "Content-Length" not in prepared_request.headers
2236
2237
    def test_stream_with_auth_does_not_set_transfer_encoding_header(self, httpbin):
2238
        """Ensure that a byte stream with size > 0 will not set both a Content-Length
2239
        and Transfer-Encoding header.
2240
        """
2241
        auth = ("user", "pass")
2242
        url = httpbin("post")
2243
        file_obj = io.BytesIO(b"test data")
2244
        r = requests.Request("POST", url, auth=auth, data=file_obj)
2245
        prepared_request = r.prepare()
2246
        assert "Transfer-Encoding" not in prepared_request.headers
2247
        assert "Content-Length" in prepared_request.headers
2248
2249
    def test_chunked_upload_does_not_set_content_length_header(self, httpbin):
2250
        """Ensure that requests with a generator body stream using
2251
        Transfer-Encoding: chunked, not a Content-Length header.
2252
        """
2253
        data = (i for i in [b"a", b"b", b"c"])
2254
        url = httpbin("post")
2255
        r = requests.Request("POST", url, data=data)
2256
        prepared_request = r.prepare()
2257
        assert "Transfer-Encoding" in prepared_request.headers
2258
        assert "Content-Length" not in prepared_request.headers
2259
2260
    def test_custom_redirect_mixin(self, httpbin):
2261
        """Tests a custom mixin to overwrite ``get_redirect_target``.
2262
2263
        Ensures a subclassed ``requests.Session`` can handle a certain type of
2264
        malformed redirect responses.
2265
2266
        1. original request receives a proper response: 302 redirect
2267
        2. following the redirect, a malformed response is given:
2268
            status code = HTTP 200
2269
            location = alternate url
2270
        3. the custom session catches the edge case and follows the redirect
2271
        """
2272
        url_final = httpbin("html")
2273
        querystring_malformed = urlencode({"location": url_final})
2274
        url_redirect_malformed = httpbin("response-headers?%s" % querystring_malformed)
2275
        querystring_redirect = urlencode({"url": url_redirect_malformed})
2276
        url_redirect = httpbin("redirect-to?%s" % querystring_redirect)
2277
        urls_test = [
2278
            url_redirect,
2279
            url_redirect_malformed,
2280
            url_final,
2281
        ]
2282
2283
        class CustomRedirectSession(requests.Session):
2284
            def get_redirect_target(self, resp):
2285
                # default behavior
2286
                if resp.is_redirect:
2287
                    return resp.headers["location"]
2288
                # edge case - check to see if 'location' is in headers anyways
2289
                location = resp.headers.get("location")
2290
                if location and (location != resp.url):
2291
                    return location
2292
                return None
2293
2294
        session = CustomRedirectSession()
2295
        r = session.get(urls_test[0])
2296
        assert len(r.history) == 2
2297
        assert r.status_code == 200
2298
        assert r.history[0].status_code == 302
2299
        assert r.history[0].is_redirect
2300
        assert r.history[1].status_code == 200
2301
        assert not r.history[1].is_redirect
2302
        assert r.url == urls_test[2]
2303
2304
2305
class TestCaseInsensitiveDict:
2306
    @pytest.mark.parametrize(
2307
        "cid",
2308
        (
2309
            CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"}),
2310
            CaseInsensitiveDict([("Foo", "foo"), ("BAr", "bar")]),
2311
            CaseInsensitiveDict(FOO="foo", BAr="bar"),
2312
        ),
2313
    )
2314
    def test_init(self, cid):
2315
        assert len(cid) == 2
2316
        assert "foo" in cid
2317
        assert "bar" in cid
2318
2319
    def test_docstring_example(self):
2320
        cid = CaseInsensitiveDict()
2321
        cid["Accept"] = "application/json"
2322
        assert cid["aCCEPT"] == "application/json"
2323
        assert list(cid) == ["Accept"]
2324
2325
    def test_len(self):
2326
        cid = CaseInsensitiveDict({"a": "a", "b": "b"})
2327
        cid["A"] = "a"
2328
        assert len(cid) == 2
2329
2330
    def test_getitem(self):
2331
        cid = CaseInsensitiveDict({"Spam": "blueval"})
2332
        assert cid["spam"] == "blueval"
2333
        assert cid["SPAM"] == "blueval"
2334
2335
    def test_fixes_649(self):
2336
        """__setitem__ should behave case-insensitively."""
2337
        cid = CaseInsensitiveDict()
2338
        cid["spam"] = "oneval"
2339
        cid["Spam"] = "twoval"
2340
        cid["sPAM"] = "redval"
2341
        cid["SPAM"] = "blueval"
2342
        assert cid["spam"] == "blueval"
2343
        assert cid["SPAM"] == "blueval"
2344
        assert list(cid.keys()) == ["SPAM"]
2345
2346
    def test_delitem(self):
2347
        cid = CaseInsensitiveDict()
2348
        cid["Spam"] = "someval"
2349
        del cid["sPam"]
2350
        assert "spam" not in cid
2351
        assert len(cid) == 0
2352
2353
    def test_contains(self):
2354
        cid = CaseInsensitiveDict()
2355
        cid["Spam"] = "someval"
2356
        assert "Spam" in cid
2357
        assert "spam" in cid
2358
        assert "SPAM" in cid
2359
        assert "sPam" in cid
2360
        assert "notspam" not in cid
2361
2362
    def test_get(self):
2363
        cid = CaseInsensitiveDict()
2364
        cid["spam"] = "oneval"
2365
        cid["SPAM"] = "blueval"
2366
        assert cid.get("spam") == "blueval"
2367
        assert cid.get("SPAM") == "blueval"
2368
        assert cid.get("sPam") == "blueval"
2369
        assert cid.get("notspam", "default") == "default"
2370
2371
    def test_update(self):
2372
        cid = CaseInsensitiveDict()
2373
        cid["spam"] = "blueval"
2374
        cid.update({"sPam": "notblueval"})
2375
        assert cid["spam"] == "notblueval"
2376
        cid = CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"})
2377
        cid.update({"fOO": "anotherfoo", "bAR": "anotherbar"})
2378
        assert len(cid) == 2
2379
        assert cid["foo"] == "anotherfoo"
2380
        assert cid["bar"] == "anotherbar"
2381
2382
    def test_update_retains_unchanged(self):
2383
        cid = CaseInsensitiveDict({"foo": "foo", "bar": "bar"})
2384
        cid.update({"foo": "newfoo"})
2385
        assert cid["bar"] == "bar"
2386
2387
    def test_iter(self):
2388
        cid = CaseInsensitiveDict({"Spam": "spam", "Eggs": "eggs"})
2389
        keys = frozenset(["Spam", "Eggs"])
2390
        assert frozenset(iter(cid)) == keys
2391
2392
    def test_equality(self):
2393
        cid = CaseInsensitiveDict({"SPAM": "blueval", "Eggs": "redval"})
2394
        othercid = CaseInsensitiveDict({"spam": "blueval", "eggs": "redval"})
2395
        assert cid == othercid
2396
        del othercid["spam"]
2397
        assert cid != othercid
2398
        assert cid == {"spam": "blueval", "eggs": "redval"}
2399
        assert cid != object()
2400
2401
    def test_setdefault(self):
2402
        cid = CaseInsensitiveDict({"Spam": "blueval"})
2403
        assert cid.setdefault("spam", "notblueval") == "blueval"
2404
        assert cid.setdefault("notspam", "notblueval") == "notblueval"
2405
2406
    def test_lower_items(self):
2407
        cid = CaseInsensitiveDict(
2408
            {
2409
                "Accept": "application/json",
2410
                "user-Agent": "requests",
2411
            }
2412
        )
2413
        keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
2414
        lowerkeyset = frozenset(["accept", "user-agent"])
2415
        assert keyset == lowerkeyset
2416
2417
    def test_preserve_key_case(self):
2418
        cid = CaseInsensitiveDict(
2419
            {
2420
                "Accept": "application/json",
2421
                "user-Agent": "requests",
2422
            }
2423
        )
2424
        keyset = frozenset(["Accept", "user-Agent"])
2425
        assert frozenset(i[0] for i in cid.items()) == keyset
2426
        assert frozenset(cid.keys()) == keyset
2427
        assert frozenset(cid) == keyset
2428
2429
    def test_preserve_last_key_case(self):
2430
        cid = CaseInsensitiveDict(
2431
            {
2432
                "Accept": "application/json",
2433
                "user-Agent": "requests",
2434
            }
2435
        )
2436
        cid.update({"ACCEPT": "application/json"})
2437
        cid["USER-AGENT"] = "requests"
2438
        keyset = frozenset(["ACCEPT", "USER-AGENT"])
2439
        assert frozenset(i[0] for i in cid.items()) == keyset
2440
        assert frozenset(cid.keys()) == keyset
2441
        assert frozenset(cid) == keyset
2442
2443
    def test_copy(self):
2444
        cid = CaseInsensitiveDict(
2445
            {
2446
                "Accept": "application/json",
2447
                "user-Agent": "requests",
2448
            }
2449
        )
2450
        cid_copy = cid.copy()
2451
        assert cid == cid_copy
2452
        cid["changed"] = True
2453
        assert cid != cid_copy
2454
2455
2456
class TestMorselToCookieExpires:
2457
    """Tests for morsel_to_cookie when morsel contains expires."""
2458
2459
    def test_expires_valid_str(self):
2460
        """Test case where we convert expires from string time."""
2461
2462
        morsel = Morsel()
2463
        morsel["expires"] = "Thu, 01-Jan-1970 00:00:01 GMT"
2464
        cookie = morsel_to_cookie(morsel)
2465
        assert cookie.expires == 1
2466
2467
    @pytest.mark.parametrize(
2468
        "value, exception",
2469
        (
2470
            (100, TypeError),
2471
            ("woops", ValueError),
2472
        ),
2473
    )
2474
    def test_expires_invalid_int(self, value, exception):
2475
        """Test case where an invalid type is passed for expires."""
2476
        morsel = Morsel()
2477
        morsel["expires"] = value
2478
        with pytest.raises(exception):
2479
            morsel_to_cookie(morsel)
2480
2481
    def test_expires_none(self):
2482
        """Test case where expires is None."""
2483
2484
        morsel = Morsel()
2485
        morsel["expires"] = None
2486
        cookie = morsel_to_cookie(morsel)
2487
        assert cookie.expires is None
2488
2489
2490
class TestMorselToCookieMaxAge:
2491
    """Tests for morsel_to_cookie when morsel contains max-age."""
2492
2493
    def test_max_age_valid_int(self):
2494
        """Test case where a valid max age in seconds is passed."""
2495
2496
        morsel = Morsel()
2497
        morsel["max-age"] = 60
2498
        cookie = morsel_to_cookie(morsel)
2499
        assert isinstance(cookie.expires, int)
2500
2501
    def test_max_age_invalid_str(self):
2502
        """Test case where a invalid max age is passed."""
2503
2504
        morsel = Morsel()
2505
        morsel["max-age"] = "woops"
2506
        with pytest.raises(TypeError):
2507
            morsel_to_cookie(morsel)
2508
2509
2510
class TestTimeout:
2511
    def test_stream_timeout(self, httpbin):
2512
        try:
2513
            requests.get(httpbin("delay/10"), timeout=2.0)
2514
        except requests.exceptions.Timeout as e:
2515
            assert "Read timed out" in e.args[0].args[0]
2516
2517
    @pytest.mark.parametrize(
2518
        "timeout, error_text",
2519
        (
2520
            ((3, 4, 5), "(connect, read)"),
2521
            ("foo", "must be an int, float or None"),
2522
        ),
2523
    )
2524
    def test_invalid_timeout(self, httpbin, timeout, error_text):
2525
        with pytest.raises(ValueError) as e:
2526
            requests.get(httpbin("get"), timeout=timeout)
2527
        assert error_text in str(e)
2528
2529
    @pytest.mark.parametrize("timeout", (None, Urllib3Timeout(connect=None, read=None)))
2530
    def test_none_timeout(self, httpbin, timeout):
2531
        """Check that you can set None as a valid timeout value.
2532
2533
        To actually test this behavior, we'd want to check that setting the
2534
        timeout to None actually lets the request block past the system default
2535
        timeout. However, this would make the test suite unbearably slow.
2536
        Instead we verify that setting the timeout to None does not prevent the
2537
        request from succeeding.
2538
        """
2539
        r = requests.get(httpbin("get"), timeout=timeout)
2540
        assert r.status_code == 200
2541
2542
    @pytest.mark.parametrize(
2543
        "timeout", ((None, 0.1), Urllib3Timeout(connect=None, read=0.1))
2544
    )
2545
    def test_read_timeout(self, httpbin, timeout):
2546
        try:
2547
            requests.get(httpbin("delay/10"), timeout=timeout)
2548
            pytest.fail("The recv() request should time out.")
2549
        except ReadTimeout:
2550
            pass
2551
2552
    @pytest.mark.parametrize(
2553
        "timeout", ((0.1, None), Urllib3Timeout(connect=0.1, read=None))
2554
    )
2555
    def test_connect_timeout(self, timeout):
2556
        try:
2557
            requests.get(TARPIT, timeout=timeout)
2558
            pytest.fail("The connect() request should time out.")
2559
        except ConnectTimeout as e:
2560
            assert isinstance(e, ConnectionError)
2561
            assert isinstance(e, Timeout)
2562
2563
    @pytest.mark.parametrize(
2564
        "timeout", ((0.1, 0.1), Urllib3Timeout(connect=0.1, read=0.1))
2565
    )
2566
    def test_total_timeout_connect(self, timeout):
2567
        try:
2568
            requests.get(TARPIT, timeout=timeout)
2569
            pytest.fail("The connect() request should time out.")
2570
        except ConnectTimeout:
2571
            pass
2572
2573
    def test_encoded_methods(self, httpbin):
2574
        """See: https://github.com/psf/requests/issues/2316"""
2575
        r = requests.request(b"GET", httpbin("get"))
2576
        assert r.ok
2577
2578
2579
SendCall = collections.namedtuple("SendCall", ("args", "kwargs"))
2580
2581
2582
class RedirectSession(SessionRedirectMixin):
2583
    def __init__(self, order_of_redirects):
2584
        self.redirects = order_of_redirects
2585
        self.calls = []
2586
        self.max_redirects = 30
2587
        self.cookies = {}
2588
        self.trust_env = False
2589
2590
    def send(self, *args, **kwargs):
2591
        self.calls.append(SendCall(args, kwargs))
2592
        return self.build_response()
2593
2594
    def build_response(self):
2595
        request = self.calls[-1].args[0]
2596
        r = requests.Response()
2597
        r.url = request.url
2598
2599
        try:
2600
            r.status_code = int(self.redirects.pop(0))
2601
        except IndexError:
2602
            r.status_code = 200
2603
2604
        r.headers = CaseInsensitiveDict({"Location": "/"})
2605
        r.raw = self._build_raw()
2606
        r.request = request
2607
        return r
2608
2609
    def _build_raw(self):
2610
        string = StringIO.StringIO("")
2611
        setattr(string, "release_conn", lambda *args: args)
2612
        return string
2613
2614
2615
def test_json_encodes_as_bytes():
2616
    # urllib3 expects bodies as bytes-like objects
2617
    body = {"key": "value"}
2618
    p = PreparedRequest()
2619
    p.prepare(method="GET", url="https://www.example.com/", json=body)
2620
    assert isinstance(p.body, bytes)
2621
2622
2623
def test_requests_are_updated_each_time(httpbin):
2624
    session = RedirectSession([303, 307])
2625
    prep = requests.Request("POST", httpbin("post")).prepare()
2626
    r0 = session.send(prep)
2627
    assert r0.request.method == "POST"
2628
    assert session.calls[-1] == SendCall((r0.request,), {})
2629
    redirect_generator = session.resolve_redirects(r0, prep)
2630
    default_keyword_args = {
2631
        "stream": False,
2632
        "verify": True,
2633
        "cert": None,
2634
        "timeout": None,
2635
        "allow_redirects": False,
2636
        "proxies": {},
2637
    }
2638
    for response in redirect_generator:
2639
        assert response.request.method == "GET"
2640
        send_call = SendCall((response.request,), default_keyword_args)
2641
        assert session.calls[-1] == send_call
2642
2643
2644
@pytest.mark.parametrize(
2645
    "var,url,proxy",
2646
    [
2647
        ("http_proxy", "http://example.com", "socks5://proxy.com:9876"),
2648
        ("https_proxy", "https://example.com", "socks5://proxy.com:9876"),
2649
        ("all_proxy", "http://example.com", "socks5://proxy.com:9876"),
2650
        ("all_proxy", "https://example.com", "socks5://proxy.com:9876"),
2651
    ],
2652
)
2653
def test_proxy_env_vars_override_default(var, url, proxy):
2654
    session = requests.Session()
2655
    prep = PreparedRequest()
2656
    prep.prepare(method="GET", url=url)
2657
2658
    kwargs = {var: proxy}
2659
    scheme = urlparse(url).scheme
2660
    with override_environ(**kwargs):
2661
        proxies = session.rebuild_proxies(prep, {})
2662
        assert scheme in proxies
2663
        assert proxies[scheme] == proxy
2664
2665
2666
@pytest.mark.parametrize(
2667
    "data",
2668
    (
2669
        (("a", "b"), ("c", "d")),
2670
        (("c", "d"), ("a", "b")),
2671
        (("a", "b"), ("c", "d"), ("e", "f")),
2672
    ),
2673
)
2674
def test_data_argument_accepts_tuples(data):
2675
    """Ensure that the data argument will accept tuples of strings
2676
    and properly encode them.
2677
    """
2678
    p = PreparedRequest()
2679
    p.prepare(
2680
        method="GET", url="http://www.example.com", data=data, hooks=default_hooks()
2681
    )
2682
    assert p.body == urlencode(data)
2683
2684
2685
@pytest.mark.parametrize(
2686
    "kwargs",
2687
    (
2688
        None,
2689
        {
2690
            "method": "GET",
2691
            "url": "http://www.example.com",
2692
            "data": "foo=bar",
2693
            "hooks": default_hooks(),
2694
        },
2695
        {
2696
            "method": "GET",
2697
            "url": "http://www.example.com",
2698
            "data": "foo=bar",
2699
            "hooks": default_hooks(),
2700
            "cookies": {"foo": "bar"},
2701
        },
2702
        {"method": "GET", "url": "http://www.example.com/üniçø∂é"},
2703
    ),
2704
)
2705
def test_prepared_copy(kwargs):
2706
    p = PreparedRequest()
2707
    if kwargs:
2708
        p.prepare(**kwargs)
2709
    copy = p.copy()
2710
    for attr in ("method", "url", "headers", "_cookies", "body", "hooks"):
2711
        assert getattr(p, attr) == getattr(copy, attr)
2712
2713
2714
def test_urllib3_retries(httpbin):
2715
    from urllib3.util import Retry
2716
2717
    s = requests.Session()
2718
    s.mount("http://", HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])))
2719
2720
    with pytest.raises(RetryError):
2721
        s.get(httpbin("status/500"))
2722
2723
2724
def test_urllib3_pool_connection_closed(httpbin):
2725
    s = requests.Session()
2726
    s.mount("http://", HTTPAdapter(pool_connections=0, pool_maxsize=0))
2727
2728
    try:
2729
        s.get(httpbin("status/200"))
2730
    except ConnectionError as e:
2731
        assert "Pool is closed." in str(e)
2732
2733
2734
class TestPreparingURLs:
2735
    @pytest.mark.parametrize(
2736
        "url,expected",
2737
        (
2738
            ("http://google.com", "http://google.com/"),
2739
            ("http://ジェーピーニック.jp", "http://xn--hckqz9bzb1cyrb.jp/"),
2740
            ("http://xn--n3h.net/", "http://xn--n3h.net/"),
2741
            ("http://ジェーピーニック.jp".encode(), "http://xn--hckqz9bzb1cyrb.jp/"),
2742
            ("http://straße.de/straße", "http://xn--strae-oqa.de/stra%C3%9Fe"),
2743
            (
2744
                "http://straße.de/straße".encode(),
2745
                "http://xn--strae-oqa.de/stra%C3%9Fe",
2746
            ),
2747
            (
2748
                "http://Königsgäßchen.de/straße",
2749
                "http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe",
2750
            ),
2751
            (
2752
                "http://Königsgäßchen.de/straße".encode(),
2753
                "http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe",
2754
            ),
2755
            (b"http://xn--n3h.net/", "http://xn--n3h.net/"),
2756
            (
2757
                b"http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
2758
                "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
2759
            ),
2760
            (
2761
                "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
2762
                "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
2763
            ),
2764
        ),
2765
    )
2766
    def test_preparing_url(self, url, expected):
2767
        def normalize_percent_encode(x):
2768
            # Helper function that normalizes equivalent
2769
            # percent-encoded bytes before comparisons
2770
            for c in re.findall(r"%[a-fA-F0-9]{2}", x):
2771
                x = x.replace(c, c.upper())
2772
            return x
2773
2774
        r = requests.Request("GET", url=url)
2775
        p = r.prepare()
2776
        assert normalize_percent_encode(p.url) == expected
2777
2778
    @pytest.mark.parametrize(
2779
        "url",
2780
        (
2781
            b"http://*.google.com",
2782
            b"http://*",
2783
            "http://*.google.com",
2784
            "http://*",
2785
            "http://☃.net/",
2786
        ),
2787
    )
2788
    def test_preparing_bad_url(self, url):
2789
        r = requests.Request("GET", url=url)
2790
        with pytest.raises(requests.exceptions.InvalidURL):
2791
            r.prepare()
2792
2793
    @pytest.mark.parametrize("url, exception", (("http://:1", InvalidURL),))
2794
    def test_redirecting_to_bad_url(self, httpbin, url, exception):
2795
        with pytest.raises(exception):
2796
            requests.get(httpbin("redirect-to"), params={"url": url})
2797
2798
    @pytest.mark.parametrize(
2799
        "input, expected",
2800
        (
2801
            (
2802
                b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
2803
                "http+unix://%2Fvar%2Frun%2Fsocket/path~",
2804
            ),
2805
            (
2806
                "http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
2807
                "http+unix://%2Fvar%2Frun%2Fsocket/path~",
2808
            ),
2809
            (
2810
                b"mailto:user@example.org",
2811
                "mailto:user@example.org",
2812
            ),
2813
            (
2814
                "mailto:user@example.org",
2815
                "mailto:user@example.org",
2816
            ),
2817
            (
2818
                b"data:SSDimaUgUHl0aG9uIQ==",
2819
                "data:SSDimaUgUHl0aG9uIQ==",
2820
            ),
2821
        ),
2822
    )
2823
    def test_url_mutation(self, input, expected):
2824
        """
2825
        This test validates that we correctly exclude some URLs from
2826
        preparation, and that we handle others. Specifically, it tests that
2827
        any URL whose scheme doesn't begin with "http" is left alone, and
2828
        those whose scheme *does* begin with "http" are mutated.
2829
        """
2830
        r = requests.Request("GET", url=input)
2831
        p = r.prepare()
2832
        assert p.url == expected
2833
2834
    @pytest.mark.parametrize(
2835
        "input, params, expected",
2836
        (
2837
            (
2838
                b"http+unix://%2Fvar%2Frun%2Fsocket/path",
2839
                {"key": "value"},
2840
                "http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
2841
            ),
2842
            (
2843
                "http+unix://%2Fvar%2Frun%2Fsocket/path",
2844
                {"key": "value"},
2845
                "http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
2846
            ),
2847
            (
2848
                b"mailto:user@example.org",
2849
                {"key": "value"},
2850
                "mailto:user@example.org",
2851
            ),
2852
            (
2853
                "mailto:user@example.org",
2854
                {"key": "value"},
2855
                "mailto:user@example.org",
2856
            ),
2857
        ),
2858
    )
2859
    def test_parameters_for_nonstandard_schemes(self, input, params, expected):
2860
        """
2861
        Setting parameters for nonstandard schemes is allowed if those schemes
2862
        begin with "http", and is forbidden otherwise.
2863
        """
2864
        r = requests.Request("GET", url=input, params=params)
2865
        p = r.prepare()
2866
        assert p.url == expected
2867
2868
    def test_post_json_nan(self, httpbin):
2869
        data = {"foo": float("nan")}
2870
        with pytest.raises(requests.exceptions.InvalidJSONError):
2871
            requests.post(httpbin("post"), json=data)
2872
2873
    def test_json_decode_compatibility(self, httpbin):
2874
        r = requests.get(httpbin("bytes/20"))
2875
        with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo:
2876
            r.json()
2877
        assert isinstance(excinfo.value, RequestException)
2878
        assert isinstance(excinfo.value, JSONDecodeError)
2879
        assert r.text not in str(excinfo.value)
2880
2881
    def test_json_decode_persists_doc_attr(self, httpbin):
2882
        r = requests.get(httpbin("bytes/20"))
2883
        with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo:
2884
            r.json()
2885
        assert excinfo.value.doc == r.text
2886
2887
    def test_status_code_425(self):
2888
        r1 = requests.codes.get("TOO_EARLY")
2889
        r2 = requests.codes.get("too_early")
2890
        r3 = requests.codes.get("UNORDERED")
2891
        r4 = requests.codes.get("unordered")
2892
        r5 = requests.codes.get("UNORDERED_COLLECTION")
2893
        r6 = requests.codes.get("unordered_collection")
2894
2895
        assert r1 == 425
2896
        assert r2 == 425
2897
        assert r3 == 425
2898
        assert r4 == 425
2899
        assert r5 == 425
2900
        assert r6 == 425
2901
2902
    def test_different_connection_pool_for_tls_settings_verify_True(self):
2903
        def response_handler(sock):
2904
            consume_socket_content(sock, timeout=0.5)
2905
            sock.send(
2906
                b"HTTP/1.1 200 OK\r\n"
2907
                b"Content-Length: 18\r\n\r\n"
2908
                b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n'
2909
            )
2910
2911
        s = requests.Session()
2912
        close_server = threading.Event()
2913
        server = TLSServer(
2914
            handler=response_handler,
2915
            wait_to_close_event=close_server,
2916
            requests_to_handle=3,
2917
            cert_chain="tests/certs/expired/server/server.pem",
2918
            keyfile="tests/certs/expired/server/server.key",
2919
        )
2920
2921
        with server as (host, port):
2922
            url = f"https://{host}:{port}"
2923
            r1 = s.get(url, verify=False)
2924
            assert r1.status_code == 200
2925
2926
            # Cannot verify self-signed certificate
2927
            with pytest.raises(requests.exceptions.SSLError):
2928
                s.get(url)
2929
2930
            close_server.set()
2931
        assert 2 == len(s.adapters["https://"].poolmanager.pools)
2932
2933
    def test_different_connection_pool_for_tls_settings_verify_bundle_expired_cert(
2934
        self,
2935
    ):
2936
        def response_handler(sock):
2937
            consume_socket_content(sock, timeout=0.5)
2938
            sock.send(
2939
                b"HTTP/1.1 200 OK\r\n"
2940
                b"Content-Length: 18\r\n\r\n"
2941
                b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n'
2942
            )
2943
2944
        s = requests.Session()
2945
        close_server = threading.Event()
2946
        server = TLSServer(
2947
            handler=response_handler,
2948
            wait_to_close_event=close_server,
2949
            requests_to_handle=3,
2950
            cert_chain="tests/certs/expired/server/server.pem",
2951
            keyfile="tests/certs/expired/server/server.key",
2952
        )
2953
2954
        with server as (host, port):
2955
            url = f"https://{host}:{port}"
2956
            r1 = s.get(url, verify=False)
2957
            assert r1.status_code == 200
2958
2959
            # Has right trust bundle, but certificate expired
2960
            with pytest.raises(requests.exceptions.SSLError):
2961
                s.get(url, verify="tests/certs/expired/ca/ca.crt")
2962
2963
            close_server.set()
2964
        assert 2 == len(s.adapters["https://"].poolmanager.pools)
2965
2966
    def test_different_connection_pool_for_tls_settings_verify_bundle_unexpired_cert(
2967
        self,
2968
    ):
2969
        def response_handler(sock):
2970
            consume_socket_content(sock, timeout=0.5)
2971
            sock.send(
2972
                b"HTTP/1.1 200 OK\r\n"
2973
                b"Content-Length: 18\r\n\r\n"
2974
                b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n'
2975
            )
2976
2977
        s = requests.Session()
2978
        close_server = threading.Event()
2979
        server = TLSServer(
2980
            handler=response_handler,
2981
            wait_to_close_event=close_server,
2982
            requests_to_handle=3,
2983
            cert_chain="tests/certs/valid/server/server.pem",
2984
            keyfile="tests/certs/valid/server/server.key",
2985
        )
2986
2987
        with server as (host, port):
2988
            url = f"https://{host}:{port}"
2989
            r1 = s.get(url, verify=False)
2990
            assert r1.status_code == 200
2991
2992
            r2 = s.get(url, verify="tests/certs/valid/ca/ca.crt")
2993
            assert r2.status_code == 200
2994
2995
            close_server.set()
2996
        assert 2 == len(s.adapters["https://"].poolmanager.pools)
2997
2998
    def test_different_connection_pool_for_mtls_settings(self):
2999
        client_cert = None
3000
3001
        def response_handler(sock):
3002
            nonlocal client_cert
3003
            client_cert = sock.getpeercert()
3004
            consume_socket_content(sock, timeout=0.5)
3005
            sock.send(
3006
                b"HTTP/1.1 200 OK\r\n"
3007
                b"Content-Length: 18\r\n\r\n"
3008
                b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n'
3009
            )
3010
3011
        s = requests.Session()
3012
        close_server = threading.Event()
3013
        server = TLSServer(
3014
            handler=response_handler,
3015
            wait_to_close_event=close_server,
3016
            requests_to_handle=2,
3017
            cert_chain="tests/certs/expired/server/server.pem",
3018
            keyfile="tests/certs/expired/server/server.key",
3019
            mutual_tls=True,
3020
            cacert="tests/certs/expired/ca/ca.crt",
3021
        )
3022
3023
        cert = (
3024
            "tests/certs/mtls/client/client.pem",
3025
            "tests/certs/mtls/client/client.key",
3026
        )
3027
        with server as (host, port):
3028
            url = f"https://{host}:{port}"
3029
            r1 = s.get(url, verify=False, cert=cert)
3030
            assert r1.status_code == 200
3031
            with pytest.raises(requests.exceptions.SSLError):
3032
                s.get(url, cert=cert)
3033
            close_server.set()
3034
3035
        assert client_cert is not None
3036
3037
3038
def test_content_length_for_bytes_data(httpbin):
3039
    data = "This is a string containing multi-byte UTF-8 ☃️"
3040
    encoded_data = data.encode("utf-8")
3041
    length = str(len(encoded_data))
3042
    req = requests.Request("POST", httpbin("post"), data=encoded_data)
3043
    p = req.prepare()
3044
3045
    assert p.headers["Content-Length"] == length
3046
3047
3048
@pytest.mark.skipif(
3049
    is_urllib3_1,
3050
    reason="urllib3 2.x encodes all strings to utf-8, urllib3 1.x uses latin-1",
3051
)
3052
def test_content_length_for_string_data_counts_bytes(httpbin):
3053
    data = "This is a string containing multi-byte UTF-8 ☃️"
3054
    length = str(len(data.encode("utf-8")))
3055
    req = requests.Request("POST", httpbin("post"), data=data)
3056
    p = req.prepare()
3057
3058
    assert p.headers["Content-Length"] == length
3059
3060
3061
def test_json_decode_errors_are_serializable_deserializable():
3062
    json_decode_error = requests.exceptions.JSONDecodeError(
3063
        "Extra data",
3064
        '{"responseCode":["706"],"data":null}{"responseCode":["706"],"data":null}',
3065
        36,
3066
    )
3067
    deserialized_error = pickle.loads(pickle.dumps(json_decode_error))
3068
    assert repr(json_decode_error) == repr(deserialized_error)
3069