1#[cfg(feature = "__tls")]
2use http::header::HeaderValue;
3#[cfg(feature = "__tls")]
4use http::uri::Scheme;
5use http::Uri;
6use hyper::rt::{Read, ReadBufCursor, Write};
7use hyper_util::client::legacy::connect::{Connected, Connection};
8#[cfg(any(feature = "socks", feature = "__tls", unix, target_os = "windows"))]
9use hyper_util::rt::TokioIo;
10#[cfg(feature = "default-tls")]
11use native_tls_crate::{TlsConnector, TlsConnectorBuilder};
12use pin_project_lite::pin_project;
13use tower::util::{BoxCloneSyncServiceLayer, MapRequestLayer};
14use tower::{timeout::TimeoutLayer, util::BoxCloneSyncService, ServiceBuilder};
15use tower_service::Service;
16
17use std::future::Future;
18use std::io::{self, IoSlice};
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25#[cfg(feature = "default-tls")]
26use self::native_tls_conn::NativeTlsConn;
27#[cfg(feature = "__rustls")]
28use self::rustls_tls_conn::RustlsTlsConn;
29use crate::dns::DynResolver;
30use crate::error::{cast_to_internal_error, BoxError};
31use crate::proxy::{Intercepted, Matcher as ProxyMatcher};
32use sealed::{Conn, Unnameable};
33
34pub(crate) type HttpConnector = hyper_util::client::legacy::connect::HttpConnector<DynResolver>;
35
36#[derive(Clone)]
37pub(crate) enum Connector {
38 Simple(ConnectorService),
40 WithLayers(BoxCloneSyncService<Unnameable, Conn, BoxError>),
43}
44
45impl Service<Uri> for Connector {
46 type Response = Conn;
47 type Error = BoxError;
48 type Future = Connecting;
49
50 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51 match self {
52 Connector::Simple(service) => service.poll_ready(cx),
53 Connector::WithLayers(service) => service.poll_ready(cx),
54 }
55 }
56
57 fn call(&mut self, dst: Uri) -> Self::Future {
58 match self {
59 Connector::Simple(service) => service.call(dst),
60 Connector::WithLayers(service) => service.call(Unnameable(dst)),
61 }
62 }
63}
64
65pub(crate) type BoxedConnectorService = BoxCloneSyncService<Unnameable, Conn, BoxError>;
66
67pub(crate) type BoxedConnectorLayer =
68 BoxCloneSyncServiceLayer<BoxedConnectorService, Unnameable, Conn, BoxError>;
69
70pub(crate) struct ConnectorBuilder {
71 inner: Inner,
72 proxies: Arc<Vec<ProxyMatcher>>,
73 verbose: verbose::Wrapper,
74 timeout: Option<Duration>,
75 #[cfg(feature = "__tls")]
76 nodelay: bool,
77 #[cfg(feature = "__tls")]
78 tls_info: bool,
79 #[cfg(feature = "__tls")]
80 user_agent: Option<HeaderValue>,
81 #[cfg(feature = "socks")]
82 resolver: Option<DynResolver>,
83 #[cfg(unix)]
84 unix_socket: Option<Arc<std::path::Path>>,
85 #[cfg(target_os = "windows")]
86 windows_named_pipe: Option<Arc<std::ffi::OsStr>>,
87}
88
89impl ConnectorBuilder {
90 pub(crate) fn build(self, layers: Vec<BoxedConnectorLayer>) -> Connector
91where {
92 let mut base_service = ConnectorService {
94 inner: self.inner,
95 proxies: self.proxies,
96 verbose: self.verbose,
97 #[cfg(feature = "__tls")]
98 nodelay: self.nodelay,
99 #[cfg(feature = "__tls")]
100 tls_info: self.tls_info,
101 #[cfg(feature = "__tls")]
102 user_agent: self.user_agent,
103 simple_timeout: None,
104 #[cfg(feature = "socks")]
105 resolver: self.resolver.unwrap_or_else(DynResolver::gai),
106 #[cfg(unix)]
107 unix_socket: self.unix_socket,
108 #[cfg(target_os = "windows")]
109 windows_named_pipe: self.windows_named_pipe,
110 };
111
112 #[cfg(unix)]
113 if base_service.unix_socket.is_some() && !base_service.proxies.is_empty() {
114 base_service.proxies = Default::default();
115 log::trace!("unix_socket() set, proxies are ignored");
116 }
117 #[cfg(target_os = "windows")]
118 if base_service.windows_named_pipe.is_some() && !base_service.proxies.is_empty() {
119 base_service.proxies = Default::default();
120 log::trace!("windows_named_pipe() set, proxies are ignored");
121 }
122
123 if layers.is_empty() {
124 base_service.simple_timeout = self.timeout;
126 return Connector::Simple(base_service);
127 }
128
129 let unnameable_service = ServiceBuilder::new()
133 .layer(MapRequestLayer::new(|request: Unnameable| request.0))
134 .service(base_service);
135 let mut service = BoxCloneSyncService::new(unnameable_service);
136
137 for layer in layers {
138 service = ServiceBuilder::new().layer(layer).service(service);
139 }
140
141 match self.timeout {
145 Some(timeout) => {
146 let service = ServiceBuilder::new()
147 .layer(TimeoutLayer::new(timeout))
148 .service(service);
149 let service = ServiceBuilder::new()
150 .map_err(|error: BoxError| cast_to_internal_error(error))
151 .service(service);
152 let service = BoxCloneSyncService::new(service);
153
154 Connector::WithLayers(service)
155 }
156 None => {
157 let service = ServiceBuilder::new().service(service);
161 let service = ServiceBuilder::new()
162 .map_err(|error: BoxError| cast_to_internal_error(error))
163 .service(service);
164 let service = BoxCloneSyncService::new(service);
165 Connector::WithLayers(service)
166 }
167 }
168 }
169
170 #[cfg(not(feature = "__tls"))]
171 pub(crate) fn new<T>(
172 mut http: HttpConnector,
173 proxies: Arc<Vec<ProxyMatcher>>,
174 local_addr: T,
175 #[cfg(any(
176 target_os = "android",
177 target_os = "fuchsia",
178 target_os = "illumos",
179 target_os = "ios",
180 target_os = "linux",
181 target_os = "macos",
182 target_os = "solaris",
183 target_os = "tvos",
184 target_os = "visionos",
185 target_os = "watchos",
186 ))]
187 interface: Option<&str>,
188 nodelay: bool,
189 ) -> ConnectorBuilder
190 where
191 T: Into<Option<IpAddr>>,
192 {
193 http.set_local_address(local_addr.into());
194 #[cfg(any(
195 target_os = "android",
196 target_os = "fuchsia",
197 target_os = "illumos",
198 target_os = "ios",
199 target_os = "linux",
200 target_os = "macos",
201 target_os = "solaris",
202 target_os = "tvos",
203 target_os = "visionos",
204 target_os = "watchos",
205 ))]
206 if let Some(interface) = interface {
207 http.set_interface(interface.to_owned());
208 }
209 http.set_nodelay(nodelay);
210
211 ConnectorBuilder {
212 inner: Inner::Http(http),
213 proxies,
214 verbose: verbose::OFF,
215 timeout: None,
216 #[cfg(feature = "socks")]
217 resolver: None,
218 #[cfg(unix)]
219 unix_socket: None,
220 #[cfg(target_os = "windows")]
221 windows_named_pipe: None,
222 }
223 }
224
225 #[cfg(feature = "default-tls")]
226 pub(crate) fn new_default_tls<T>(
227 http: HttpConnector,
228 tls: TlsConnectorBuilder,
229 proxies: Arc<Vec<ProxyMatcher>>,
230 user_agent: Option<HeaderValue>,
231 local_addr: T,
232 #[cfg(any(
233 target_os = "android",
234 target_os = "fuchsia",
235 target_os = "illumos",
236 target_os = "ios",
237 target_os = "linux",
238 target_os = "macos",
239 target_os = "solaris",
240 target_os = "tvos",
241 target_os = "visionos",
242 target_os = "watchos",
243 ))]
244 interface: Option<&str>,
245 nodelay: bool,
246 tls_info: bool,
247 ) -> crate::Result<ConnectorBuilder>
248 where
249 T: Into<Option<IpAddr>>,
250 {
251 let tls = tls.build().map_err(crate::error::builder)?;
252 Ok(Self::from_built_default_tls(
253 http,
254 tls,
255 proxies,
256 user_agent,
257 local_addr,
258 #[cfg(any(
259 target_os = "android",
260 target_os = "fuchsia",
261 target_os = "illumos",
262 target_os = "ios",
263 target_os = "linux",
264 target_os = "macos",
265 target_os = "solaris",
266 target_os = "tvos",
267 target_os = "visionos",
268 target_os = "watchos",
269 ))]
270 interface,
271 nodelay,
272 tls_info,
273 ))
274 }
275
276 #[cfg(feature = "default-tls")]
277 pub(crate) fn from_built_default_tls<T>(
278 mut http: HttpConnector,
279 tls: TlsConnector,
280 proxies: Arc<Vec<ProxyMatcher>>,
281 user_agent: Option<HeaderValue>,
282 local_addr: T,
283 #[cfg(any(
284 target_os = "android",
285 target_os = "fuchsia",
286 target_os = "illumos",
287 target_os = "ios",
288 target_os = "linux",
289 target_os = "macos",
290 target_os = "solaris",
291 target_os = "tvos",
292 target_os = "visionos",
293 target_os = "watchos",
294 ))]
295 interface: Option<&str>,
296 nodelay: bool,
297 tls_info: bool,
298 ) -> ConnectorBuilder
299 where
300 T: Into<Option<IpAddr>>,
301 {
302 http.set_local_address(local_addr.into());
303 #[cfg(any(
304 target_os = "android",
305 target_os = "fuchsia",
306 target_os = "illumos",
307 target_os = "ios",
308 target_os = "linux",
309 target_os = "macos",
310 target_os = "solaris",
311 target_os = "tvos",
312 target_os = "visionos",
313 target_os = "watchos",
314 ))]
315 if let Some(interface) = interface {
316 http.set_interface(interface);
317 }
318 http.set_nodelay(nodelay);
319 http.enforce_http(false);
320
321 ConnectorBuilder {
322 inner: Inner::DefaultTls(http, tls),
323 proxies,
324 verbose: verbose::OFF,
325 nodelay,
326 tls_info,
327 user_agent,
328 timeout: None,
329 #[cfg(feature = "socks")]
330 resolver: None,
331 #[cfg(unix)]
332 unix_socket: None,
333 #[cfg(target_os = "windows")]
334 windows_named_pipe: None,
335 }
336 }
337
338 #[cfg(feature = "__rustls")]
339 pub(crate) fn new_rustls_tls<T>(
340 mut http: HttpConnector,
341 tls: rustls::ClientConfig,
342 proxies: Arc<Vec<ProxyMatcher>>,
343 user_agent: Option<HeaderValue>,
344 local_addr: T,
345 #[cfg(any(
346 target_os = "android",
347 target_os = "fuchsia",
348 target_os = "illumos",
349 target_os = "ios",
350 target_os = "linux",
351 target_os = "macos",
352 target_os = "solaris",
353 target_os = "tvos",
354 target_os = "visionos",
355 target_os = "watchos",
356 ))]
357 interface: Option<&str>,
358 nodelay: bool,
359 tls_info: bool,
360 ) -> ConnectorBuilder
361 where
362 T: Into<Option<IpAddr>>,
363 {
364 http.set_local_address(local_addr.into());
365 #[cfg(any(
366 target_os = "android",
367 target_os = "fuchsia",
368 target_os = "illumos",
369 target_os = "ios",
370 target_os = "linux",
371 target_os = "macos",
372 target_os = "solaris",
373 target_os = "tvos",
374 target_os = "visionos",
375 target_os = "watchos",
376 ))]
377 if let Some(interface) = interface {
378 http.set_interface(interface.to_owned());
379 }
380 http.set_nodelay(nodelay);
381 http.enforce_http(false);
382
383 let (tls, tls_proxy) = if proxies.is_empty() {
384 let tls = Arc::new(tls);
385 (tls.clone(), tls)
386 } else {
387 let mut tls_proxy = tls.clone();
388 tls_proxy.alpn_protocols.clear();
389 (Arc::new(tls), Arc::new(tls_proxy))
390 };
391
392 ConnectorBuilder {
393 inner: Inner::RustlsTls {
394 http,
395 tls,
396 tls_proxy,
397 },
398 proxies,
399 verbose: verbose::OFF,
400 nodelay,
401 tls_info,
402 user_agent,
403 timeout: None,
404 #[cfg(feature = "socks")]
405 resolver: None,
406 #[cfg(unix)]
407 unix_socket: None,
408 #[cfg(target_os = "windows")]
409 windows_named_pipe: None,
410 }
411 }
412
413 pub(crate) fn set_timeout(&mut self, timeout: Option<Duration>) {
414 self.timeout = timeout;
415 }
416
417 pub(crate) fn set_verbose(&mut self, enabled: bool) {
418 self.verbose.0 = enabled;
419 }
420
421 pub(crate) fn set_keepalive(&mut self, dur: Option<Duration>) {
422 match &mut self.inner {
423 #[cfg(feature = "default-tls")]
424 Inner::DefaultTls(http, _tls) => http.set_keepalive(dur),
425 #[cfg(feature = "__rustls")]
426 Inner::RustlsTls { http, .. } => http.set_keepalive(dur),
427 #[cfg(not(feature = "__tls"))]
428 Inner::Http(http) => http.set_keepalive(dur),
429 }
430 }
431
432 pub(crate) fn set_keepalive_interval(&mut self, dur: Option<Duration>) {
433 match &mut self.inner {
434 #[cfg(feature = "default-tls")]
435 Inner::DefaultTls(http, _tls) => http.set_keepalive_interval(dur),
436 #[cfg(feature = "__rustls")]
437 Inner::RustlsTls { http, .. } => http.set_keepalive_interval(dur),
438 #[cfg(not(feature = "__tls"))]
439 Inner::Http(http) => http.set_keepalive_interval(dur),
440 }
441 }
442
443 pub(crate) fn set_keepalive_retries(&mut self, retries: Option<u32>) {
444 match &mut self.inner {
445 #[cfg(feature = "default-tls")]
446 Inner::DefaultTls(http, _tls) => http.set_keepalive_retries(retries),
447 #[cfg(feature = "__rustls")]
448 Inner::RustlsTls { http, .. } => http.set_keepalive_retries(retries),
449 #[cfg(not(feature = "__tls"))]
450 Inner::Http(http) => http.set_keepalive_retries(retries),
451 }
452 }
453
454 #[cfg(feature = "socks")]
455 pub(crate) fn set_socks_resolver(&mut self, resolver: DynResolver) {
456 self.resolver = Some(resolver);
457 }
458
459 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
460 pub(crate) fn set_tcp_user_timeout(&mut self, dur: Option<Duration>) {
461 match &mut self.inner {
462 #[cfg(feature = "default-tls")]
463 Inner::DefaultTls(http, _tls) => http.set_tcp_user_timeout(dur),
464 #[cfg(feature = "__rustls")]
465 Inner::RustlsTls { http, .. } => http.set_tcp_user_timeout(dur),
466 #[cfg(not(feature = "__tls"))]
467 Inner::Http(http) => http.set_tcp_user_timeout(dur),
468 }
469 }
470
471 #[cfg(unix)]
472 pub(crate) fn set_unix_socket(&mut self, path: Option<Arc<std::path::Path>>) {
473 self.unix_socket = path;
474 }
475
476 #[cfg(target_os = "windows")]
477 pub(crate) fn set_windows_named_pipe(&mut self, pipe: Option<Arc<std::ffi::OsStr>>) {
478 self.windows_named_pipe = pipe;
479 }
480}
481
482#[allow(missing_debug_implementations)]
483#[derive(Clone)]
484pub(crate) struct ConnectorService {
485 inner: Inner,
486 proxies: Arc<Vec<ProxyMatcher>>,
487 verbose: verbose::Wrapper,
488 simple_timeout: Option<Duration>,
493 #[cfg(feature = "__tls")]
494 nodelay: bool,
495 #[cfg(feature = "__tls")]
496 tls_info: bool,
497 #[cfg(feature = "__tls")]
498 user_agent: Option<HeaderValue>,
499 #[cfg(feature = "socks")]
500 resolver: DynResolver,
501 #[cfg(unix)]
503 unix_socket: Option<Arc<std::path::Path>>,
504 #[cfg(target_os = "windows")]
505 windows_named_pipe: Option<Arc<std::ffi::OsStr>>,
506}
507
508#[derive(Clone)]
509enum Inner {
510 #[cfg(not(feature = "__tls"))]
511 Http(HttpConnector),
512 #[cfg(feature = "default-tls")]
513 DefaultTls(HttpConnector, TlsConnector),
514 #[cfg(feature = "__rustls")]
515 RustlsTls {
516 http: HttpConnector,
517 tls: Arc<rustls::ClientConfig>,
518 tls_proxy: Arc<rustls::ClientConfig>,
519 },
520}
521
522impl Inner {
523 #[cfg(feature = "socks")]
524 fn get_http_connector(&mut self) -> &mut crate::connect::HttpConnector {
525 match self {
526 #[cfg(feature = "default-tls")]
527 Inner::DefaultTls(http, _) => http,
528 #[cfg(feature = "__rustls")]
529 Inner::RustlsTls { http, .. } => http,
530 #[cfg(not(feature = "__tls"))]
531 Inner::Http(http) => http,
532 }
533 }
534}
535
536impl ConnectorService {
537 #[cfg(feature = "socks")]
538 async fn connect_socks(mut self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
539 let dns = match proxy.uri().scheme_str() {
540 Some("socks4") | Some("socks5") => socks::DnsResolve::Local,
541 Some("socks4a") | Some("socks5h") => socks::DnsResolve::Proxy,
542 _ => {
543 unreachable!("connect_socks is only called for socks proxies");
544 }
545 };
546
547 match &mut self.inner {
548 #[cfg(feature = "default-tls")]
549 Inner::DefaultTls(http, tls) => {
550 if dst.scheme() == Some(&Scheme::HTTPS) {
551 let host = dst.host().ok_or("no host in url")?.to_string();
552 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
553 let conn = TokioIo::new(conn);
554 let conn = TokioIo::new(conn);
555 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
556 let io = tls_connector.connect(&host, conn).await?;
557 let io = TokioIo::new(io);
558 return Ok(Conn {
559 inner: self.verbose.wrap(NativeTlsConn { inner: io }),
560 is_proxy: false,
561 tls_info: self.tls_info,
562 });
563 }
564 }
565 #[cfg(feature = "__rustls")]
566 Inner::RustlsTls { http, tls, .. } => {
567 if dst.scheme() == Some(&Scheme::HTTPS) {
568 use std::convert::TryFrom;
569 use tokio_rustls::TlsConnector as RustlsConnector;
570
571 let tls = tls.clone();
572 let host = dst.host().ok_or("no host in url")?.to_string();
573 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
574 let conn = TokioIo::new(conn);
575 let conn = TokioIo::new(conn);
576 let server_name =
577 rustls_pki_types::ServerName::try_from(host.as_str().to_owned())
578 .map_err(|_| "Invalid Server Name")?;
579 let io = RustlsConnector::from(tls)
580 .connect(server_name, conn)
581 .await?;
582 let io = TokioIo::new(io);
583 return Ok(Conn {
584 inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
585 is_proxy: false,
586 tls_info: false,
587 });
588 }
589 }
590 #[cfg(not(feature = "__tls"))]
591 Inner::Http(http) => {
592 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
593 return Ok(Conn {
594 inner: self.verbose.wrap(TokioIo::new(conn)),
595 is_proxy: false,
596 tls_info: false,
597 });
598 }
599 }
600
601 let resolver = &self.resolver;
602 let http = self.inner.get_http_connector();
603 socks::connect(proxy, dst, dns, resolver, http)
604 .await
605 .map(|tcp| Conn {
606 inner: self.verbose.wrap(TokioIo::new(tcp)),
607 is_proxy: false,
608 tls_info: false,
609 })
610 .map_err(Into::into)
611 }
612
613 async fn connect_with_maybe_proxy(self, dst: Uri, is_proxy: bool) -> Result<Conn, BoxError> {
614 match self.inner {
615 #[cfg(not(feature = "__tls"))]
616 Inner::Http(mut http) => {
617 let io = http.call(dst).await?;
618 Ok(Conn {
619 inner: self.verbose.wrap(io),
620 is_proxy,
621 tls_info: false,
622 })
623 }
624 #[cfg(feature = "default-tls")]
625 Inner::DefaultTls(http, tls) => {
626 let mut http = http.clone();
627
628 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
632 http.set_nodelay(true);
633 }
634
635 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
636 let mut http = hyper_tls::HttpsConnector::from((http, tls_connector));
637 let io = http.call(dst).await?;
638
639 if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
640 if !self.nodelay {
641 stream
642 .inner()
643 .get_ref()
644 .get_ref()
645 .get_ref()
646 .inner()
647 .inner()
648 .set_nodelay(false)?;
649 }
650 Ok(Conn {
651 inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
652 is_proxy,
653 tls_info: self.tls_info,
654 })
655 } else {
656 Ok(Conn {
657 inner: self.verbose.wrap(io),
658 is_proxy,
659 tls_info: false,
660 })
661 }
662 }
663 #[cfg(feature = "__rustls")]
664 Inner::RustlsTls { http, tls, .. } => {
665 let mut http = http.clone();
666
667 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
671 http.set_nodelay(true);
672 }
673
674 let mut http = hyper_rustls::HttpsConnector::from((http, tls.clone()));
675 let io = http.call(dst).await?;
676
677 if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
678 if !self.nodelay {
679 let (io, _) = stream.inner().get_ref();
680 io.inner().inner().set_nodelay(false)?;
681 }
682 Ok(Conn {
683 inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
684 is_proxy,
685 tls_info: self.tls_info,
686 })
687 } else {
688 Ok(Conn {
689 inner: self.verbose.wrap(io),
690 is_proxy,
691 tls_info: false,
692 })
693 }
694 }
695 }
696 }
697
698 #[cfg(any(unix, target_os = "windows"))]
700 async fn connect_local_transport(self, dst: Uri) -> Result<Conn, BoxError> {
701 #[cfg(unix)]
702 let svc = {
703 let path = self
704 .unix_socket
705 .as_ref()
706 .expect("connect local must have socket path")
707 .clone();
708 tower::service_fn(move |_| {
709 let fut = tokio::net::UnixStream::connect(path.clone());
710 async move {
711 let io = fut.await?;
712 Ok::<_, std::io::Error>(TokioIo::new(io))
713 }
714 })
715 };
716 #[cfg(target_os = "windows")]
717 let svc = {
718 use tokio::net::windows::named_pipe::ClientOptions;
719 let pipe = self
720 .windows_named_pipe
721 .as_ref()
722 .expect("connect local must have pipe path")
723 .clone();
724 tower::service_fn(move |_| {
725 let pipe = pipe.clone();
726 async move { ClientOptions::new().open(pipe).map(TokioIo::new) }
727 })
728 };
729 let is_proxy = false;
730 match self.inner {
731 #[cfg(not(feature = "__tls"))]
732 Inner::Http(..) => {
733 let mut svc = svc;
734 let io = svc.call(dst).await?;
735 Ok(Conn {
736 inner: self.verbose.wrap(io),
737 is_proxy,
738 tls_info: false,
739 })
740 }
741 #[cfg(feature = "default-tls")]
742 Inner::DefaultTls(_, tls) => {
743 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
744 let mut http = hyper_tls::HttpsConnector::from((svc, tls_connector));
745 let io = http.call(dst).await?;
746
747 if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
748 Ok(Conn {
749 inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
750 is_proxy,
751 tls_info: self.tls_info,
752 })
753 } else {
754 Ok(Conn {
755 inner: self.verbose.wrap(io),
756 is_proxy,
757 tls_info: false,
758 })
759 }
760 }
761 #[cfg(feature = "__rustls")]
762 Inner::RustlsTls { tls, .. } => {
763 let mut http = hyper_rustls::HttpsConnector::from((svc, tls.clone()));
764 let io = http.call(dst).await?;
765
766 if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
767 Ok(Conn {
768 inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
769 is_proxy,
770 tls_info: self.tls_info,
771 })
772 } else {
773 Ok(Conn {
774 inner: self.verbose.wrap(io),
775 is_proxy,
776 tls_info: false,
777 })
778 }
779 }
780 }
781 }
782
783 async fn connect_via_proxy(self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
784 log::debug!("proxy({proxy:?}) intercepts '{dst:?}'");
785
786 #[cfg(feature = "socks")]
787 match proxy.uri().scheme_str().ok_or("proxy scheme expected")? {
788 "socks4" | "socks4a" | "socks5" | "socks5h" => {
789 return self.connect_socks(dst, proxy).await
790 }
791 _ => (),
792 }
793
794 let proxy_dst = proxy.uri().clone();
795 #[cfg(feature = "__tls")]
796 let auth = proxy.basic_auth().cloned();
797
798 #[cfg(feature = "__tls")]
799 let misc = proxy.custom_headers().clone();
800
801 match &self.inner {
802 #[cfg(feature = "default-tls")]
803 Inner::DefaultTls(http, tls) => {
804 if dst.scheme() == Some(&Scheme::HTTPS) {
805 log::trace!("tunneling HTTPS over proxy");
806 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
807 let inner =
808 hyper_tls::HttpsConnector::from((http.clone(), tls_connector.clone()));
809 let mut tunnel =
811 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
812 if let Some(auth) = auth {
813 tunnel = tunnel.with_auth(auth);
814 }
815 if let Some(ua) = self.user_agent {
816 let mut headers = http::HeaderMap::new();
817 headers.insert(http::header::USER_AGENT, ua);
818 tunnel = tunnel.with_headers(headers);
819 }
820 if let Some(custom_headers) = misc {
822 tunnel = tunnel.with_headers(custom_headers.clone());
823 }
824 let tunneled = tunnel.call(dst.clone()).await?;
827 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
828 let io = tls_connector
829 .connect(dst.host().ok_or("no host in url")?, TokioIo::new(tunneled))
830 .await?;
831 return Ok(Conn {
832 inner: self.verbose.wrap(NativeTlsConn {
833 inner: TokioIo::new(io),
834 }),
835 is_proxy: false,
836 tls_info: false,
837 });
838 }
839 }
840 #[cfg(feature = "__rustls")]
841 Inner::RustlsTls {
842 http,
843 tls,
844 tls_proxy,
845 } => {
846 if dst.scheme() == Some(&Scheme::HTTPS) {
847 use rustls_pki_types::ServerName;
848 use std::convert::TryFrom;
849 use tokio_rustls::TlsConnector as RustlsConnector;
850
851 log::trace!("tunneling HTTPS over proxy");
852 let http = http.clone();
853 let inner = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
854 let mut tunnel =
856 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
857 if let Some(auth) = auth {
858 tunnel = tunnel.with_auth(auth);
859 }
860 if let Some(custom_headers) = misc {
861 tunnel = tunnel.with_headers(custom_headers.clone());
862 }
863 if let Some(ua) = self.user_agent {
864 let mut headers = http::HeaderMap::new();
865 headers.insert(http::header::USER_AGENT, ua);
866 tunnel = tunnel.with_headers(headers);
867 }
868 let tunneled = tunnel.call(dst.clone()).await?;
871 let host = dst.host().ok_or("no host in url")?.to_string();
872 let server_name = ServerName::try_from(host.as_str().to_owned())
873 .map_err(|_| "Invalid Server Name")?;
874 let io = RustlsConnector::from(tls.clone())
875 .connect(server_name, TokioIo::new(tunneled))
876 .await?;
877
878 return Ok(Conn {
879 inner: self.verbose.wrap(RustlsTlsConn {
880 inner: TokioIo::new(io),
881 }),
882 is_proxy: false,
883 tls_info: false,
884 });
885 }
886 }
887 #[cfg(not(feature = "__tls"))]
888 Inner::Http(_) => (),
889 }
890
891 self.connect_with_maybe_proxy(proxy_dst, true).await
892 }
893
894 #[cfg(any(unix, target_os = "windows"))]
895 fn should_use_local_transport(&self) -> bool {
896 #[cfg(unix)]
897 return self.unix_socket.is_some();
898
899 #[cfg(target_os = "windows")]
900 return self.windows_named_pipe.is_some();
901 }
902}
903
904async fn with_timeout<T, F>(f: F, timeout: Option<Duration>) -> Result<T, BoxError>
905where
906 F: Future<Output = Result<T, BoxError>>,
907{
908 if let Some(to) = timeout {
909 match tokio::time::timeout(to, f).await {
910 Err(_elapsed) => Err(Box::new(crate::error::TimedOut) as BoxError),
911 Ok(Ok(try_res)) => Ok(try_res),
912 Ok(Err(e)) => Err(e),
913 }
914 } else {
915 f.await
916 }
917}
918
919impl Service<Uri> for ConnectorService {
920 type Response = Conn;
921 type Error = BoxError;
922 type Future = Connecting;
923
924 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
925 Poll::Ready(Ok(()))
926 }
927
928 fn call(&mut self, dst: Uri) -> Self::Future {
929 log::debug!("starting new connection: {dst:?}");
930 let timeout = self.simple_timeout;
931
932 #[cfg(any(unix, target_os = "windows"))]
934 if self.should_use_local_transport() {
935 return Box::pin(with_timeout(
936 self.clone().connect_local_transport(dst),
937 timeout,
938 ));
939 }
940
941 for prox in self.proxies.iter() {
942 if let Some(intercepted) = prox.intercept(&dst) {
943 return Box::pin(with_timeout(
944 self.clone().connect_via_proxy(dst, intercepted),
945 timeout,
946 ));
947 }
948 }
949
950 Box::pin(with_timeout(
951 self.clone().connect_with_maybe_proxy(dst, false),
952 timeout,
953 ))
954 }
955}
956
957#[cfg(feature = "__tls")]
958trait TlsInfoFactory {
959 fn tls_info(&self) -> Option<crate::tls::TlsInfo>;
960}
961
962#[cfg(feature = "__tls")]
963impl<T: TlsInfoFactory> TlsInfoFactory for TokioIo<T> {
964 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
965 self.inner().tls_info()
966 }
967}
968
969#[cfg(feature = "__tls")]
972impl TlsInfoFactory for tokio::net::TcpStream {
973 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
974 None
975 }
976}
977
978#[cfg(feature = "default-tls")]
979impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
980 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
981 let peer_certificate = self
982 .get_ref()
983 .peer_certificate()
984 .ok()
985 .flatten()
986 .and_then(|c| c.to_der().ok());
987 Some(crate::tls::TlsInfo { peer_certificate })
988 }
989}
990
991#[cfg(feature = "default-tls")]
992impl TlsInfoFactory
993 for tokio_native_tls::TlsStream<
994 TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
995 >
996{
997 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
998 let peer_certificate = self
999 .get_ref()
1000 .peer_certificate()
1001 .ok()
1002 .flatten()
1003 .and_then(|c| c.to_der().ok());
1004 Some(crate::tls::TlsInfo { peer_certificate })
1005 }
1006}
1007
1008#[cfg(feature = "default-tls")]
1009impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
1010 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1011 match self {
1012 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1013 hyper_tls::MaybeHttpsStream::Http(_) => None,
1014 }
1015 }
1016}
1017
1018#[cfg(feature = "__rustls")]
1019impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
1020 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1021 let peer_certificate = self
1022 .get_ref()
1023 .1
1024 .peer_certificates()
1025 .and_then(|certs| certs.first())
1026 .map(|c| c.to_vec());
1027 Some(crate::tls::TlsInfo { peer_certificate })
1028 }
1029}
1030
1031#[cfg(feature = "__rustls")]
1032impl TlsInfoFactory
1033 for tokio_rustls::client::TlsStream<
1034 TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
1035 >
1036{
1037 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1038 let peer_certificate = self
1039 .get_ref()
1040 .1
1041 .peer_certificates()
1042 .and_then(|certs| certs.first())
1043 .map(|c| c.to_vec());
1044 Some(crate::tls::TlsInfo { peer_certificate })
1045 }
1046}
1047
1048#[cfg(feature = "__rustls")]
1049impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
1050 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1051 match self {
1052 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1053 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1054 }
1055 }
1056}
1057
1058#[cfg(feature = "__tls")]
1061#[cfg(unix)]
1062impl TlsInfoFactory for tokio::net::UnixStream {
1063 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1064 None
1065 }
1066}
1067
1068#[cfg(feature = "default-tls")]
1069#[cfg(unix)]
1070impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1071 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1072 let peer_certificate = self
1073 .get_ref()
1074 .peer_certificate()
1075 .ok()
1076 .flatten()
1077 .and_then(|c| c.to_der().ok());
1078 Some(crate::tls::TlsInfo { peer_certificate })
1079 }
1080}
1081
1082#[cfg(feature = "default-tls")]
1083#[cfg(unix)]
1084impl TlsInfoFactory
1085 for tokio_native_tls::TlsStream<
1086 TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1087 >
1088{
1089 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1090 let peer_certificate = self
1091 .get_ref()
1092 .peer_certificate()
1093 .ok()
1094 .flatten()
1095 .and_then(|c| c.to_der().ok());
1096 Some(crate::tls::TlsInfo { peer_certificate })
1097 }
1098}
1099
1100#[cfg(feature = "default-tls")]
1101#[cfg(unix)]
1102impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1103 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1104 match self {
1105 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1106 hyper_tls::MaybeHttpsStream::Http(_) => None,
1107 }
1108 }
1109}
1110
1111#[cfg(feature = "__rustls")]
1112#[cfg(unix)]
1113impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1114 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1115 let peer_certificate = self
1116 .get_ref()
1117 .1
1118 .peer_certificates()
1119 .and_then(|certs| certs.first())
1120 .map(|c| c.to_vec());
1121 Some(crate::tls::TlsInfo { peer_certificate })
1122 }
1123}
1124
1125#[cfg(feature = "__rustls")]
1126#[cfg(unix)]
1127impl TlsInfoFactory
1128 for tokio_rustls::client::TlsStream<
1129 TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1130 >
1131{
1132 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1133 let peer_certificate = self
1134 .get_ref()
1135 .1
1136 .peer_certificates()
1137 .and_then(|certs| certs.first())
1138 .map(|c| c.to_vec());
1139 Some(crate::tls::TlsInfo { peer_certificate })
1140 }
1141}
1142
1143#[cfg(feature = "__rustls")]
1144#[cfg(unix)]
1145impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1146 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1147 match self {
1148 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1149 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1150 }
1151 }
1152}
1153
1154#[cfg(feature = "__tls")]
1157#[cfg(target_os = "windows")]
1158impl TlsInfoFactory for tokio::net::windows::named_pipe::NamedPipeClient {
1159 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1160 None
1161 }
1162}
1163
1164#[cfg(feature = "default-tls")]
1165#[cfg(target_os = "windows")]
1166impl TlsInfoFactory
1167 for tokio_native_tls::TlsStream<
1168 TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>,
1169 >
1170{
1171 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1172 let peer_certificate = self
1173 .get_ref()
1174 .peer_certificate()
1175 .ok()
1176 .flatten()
1177 .and_then(|c| c.to_der().ok());
1178 Some(crate::tls::TlsInfo { peer_certificate })
1179 }
1180}
1181
1182#[cfg(feature = "default-tls")]
1183#[cfg(target_os = "windows")]
1184impl TlsInfoFactory
1185 for tokio_native_tls::TlsStream<
1186 TokioIo<
1187 hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>,
1188 >,
1189 >
1190{
1191 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1192 let peer_certificate = self
1193 .get_ref()
1194 .peer_certificate()
1195 .ok()
1196 .flatten()
1197 .and_then(|c| c.to_der().ok());
1198 Some(crate::tls::TlsInfo { peer_certificate })
1199 }
1200}
1201
1202#[cfg(feature = "default-tls")]
1203#[cfg(target_os = "windows")]
1204impl TlsInfoFactory
1205 for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>
1206{
1207 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1208 match self {
1209 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1210 hyper_tls::MaybeHttpsStream::Http(_) => None,
1211 }
1212 }
1213}
1214
1215#[cfg(feature = "__rustls")]
1216#[cfg(target_os = "windows")]
1217impl TlsInfoFactory
1218 for tokio_rustls::client::TlsStream<
1219 TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>,
1220 >
1221{
1222 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1223 let peer_certificate = self
1224 .get_ref()
1225 .1
1226 .peer_certificates()
1227 .and_then(|certs| certs.first())
1228 .map(|c| c.to_vec());
1229 Some(crate::tls::TlsInfo { peer_certificate })
1230 }
1231}
1232
1233#[cfg(feature = "__rustls")]
1234#[cfg(target_os = "windows")]
1235impl TlsInfoFactory
1236 for tokio_rustls::client::TlsStream<
1237 TokioIo<
1238 hyper_rustls::MaybeHttpsStream<
1239 TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>,
1240 >,
1241 >,
1242 >
1243{
1244 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1245 let peer_certificate = self
1246 .get_ref()
1247 .1
1248 .peer_certificates()
1249 .and_then(|certs| certs.first())
1250 .map(|c| c.to_vec());
1251 Some(crate::tls::TlsInfo { peer_certificate })
1252 }
1253}
1254
1255#[cfg(feature = "__rustls")]
1256#[cfg(target_os = "windows")]
1257impl TlsInfoFactory
1258 for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>
1259{
1260 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1261 match self {
1262 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1263 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1264 }
1265 }
1266}
1267
1268pub(crate) trait AsyncConn:
1269 Read + Write + Connection + Send + Sync + Unpin + 'static
1270{
1271}
1272
1273impl<T: Read + Write + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}
1274
1275#[cfg(feature = "__tls")]
1276trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {}
1277#[cfg(not(feature = "__tls"))]
1278trait AsyncConnWithInfo: AsyncConn {}
1279
1280#[cfg(feature = "__tls")]
1281impl<T: AsyncConn + TlsInfoFactory> AsyncConnWithInfo for T {}
1282#[cfg(not(feature = "__tls"))]
1283impl<T: AsyncConn> AsyncConnWithInfo for T {}
1284
1285type BoxConn = Box<dyn AsyncConnWithInfo>;
1286
1287pub(crate) mod sealed {
1288 use super::*;
1289 #[derive(Debug)]
1290 pub struct Unnameable(pub(super) Uri);
1291
1292 pin_project! {
1293 #[allow(missing_debug_implementations)]
1298 pub struct Conn {
1299 #[pin]
1300 pub(super)inner: BoxConn,
1301 pub(super) is_proxy: bool,
1302 pub(super) tls_info: bool,
1304 }
1305 }
1306
1307 impl Connection for Conn {
1308 fn connected(&self) -> Connected {
1309 let connected = self.inner.connected().proxy(self.is_proxy);
1310 #[cfg(feature = "__tls")]
1311 if self.tls_info {
1312 if let Some(tls_info) = self.inner.tls_info() {
1313 connected.extra(tls_info)
1314 } else {
1315 connected
1316 }
1317 } else {
1318 connected
1319 }
1320 #[cfg(not(feature = "__tls"))]
1321 connected
1322 }
1323 }
1324
1325 impl Read for Conn {
1326 fn poll_read(
1327 self: Pin<&mut Self>,
1328 cx: &mut Context,
1329 buf: ReadBufCursor<'_>,
1330 ) -> Poll<io::Result<()>> {
1331 let this = self.project();
1332 Read::poll_read(this.inner, cx, buf)
1333 }
1334 }
1335
1336 impl Write for Conn {
1337 fn poll_write(
1338 self: Pin<&mut Self>,
1339 cx: &mut Context,
1340 buf: &[u8],
1341 ) -> Poll<Result<usize, io::Error>> {
1342 let this = self.project();
1343 Write::poll_write(this.inner, cx, buf)
1344 }
1345
1346 fn poll_write_vectored(
1347 self: Pin<&mut Self>,
1348 cx: &mut Context<'_>,
1349 bufs: &[IoSlice<'_>],
1350 ) -> Poll<Result<usize, io::Error>> {
1351 let this = self.project();
1352 Write::poll_write_vectored(this.inner, cx, bufs)
1353 }
1354
1355 fn is_write_vectored(&self) -> bool {
1356 self.inner.is_write_vectored()
1357 }
1358
1359 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1360 let this = self.project();
1361 Write::poll_flush(this.inner, cx)
1362 }
1363
1364 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1365 let this = self.project();
1366 Write::poll_shutdown(this.inner, cx)
1367 }
1368 }
1369}
1370
1371#[cfg(unix)]
1373pub(crate) mod uds {
1374 use std::path::Path;
1375
1376 #[cfg(unix)]
1383 pub trait UnixSocketProvider {
1384 #[doc(hidden)]
1385 fn reqwest_uds_path(&self, _: Internal) -> &Path;
1386 }
1387
1388 #[allow(missing_debug_implementations)]
1389 pub struct Internal;
1390
1391 macro_rules! as_path {
1392 ($($t:ty,)+) => {
1393 $(
1394 impl UnixSocketProvider for $t {
1395 #[doc(hidden)]
1396 fn reqwest_uds_path(&self, _: Internal) -> &Path {
1397 self.as_ref()
1398 }
1399 }
1400 )+
1401 }
1402 }
1403
1404 as_path![
1405 String,
1406 &'_ str,
1407 &'_ Path,
1408 std::path::PathBuf,
1409 std::sync::Arc<Path>,
1410 ];
1411}
1412
1413#[cfg(target_os = "windows")]
1415pub(crate) mod windows_named_pipe {
1416 use std::ffi::OsStr;
1417 #[cfg(target_os = "windows")]
1422 pub trait WindowsNamedPipeProvider {
1423 #[doc(hidden)]
1424 fn reqwest_windows_named_pipe_path(&self, _: Internal) -> &OsStr;
1425 }
1426
1427 #[allow(missing_debug_implementations)]
1428 pub struct Internal;
1429
1430 macro_rules! as_os_str {
1431 ($($t:ty,)+) => {
1432 $(
1433 impl WindowsNamedPipeProvider for $t {
1434 #[doc(hidden)]
1435 fn reqwest_windows_named_pipe_path(&self, _: Internal) -> &OsStr {
1436 self.as_ref()
1437 }
1438 }
1439 )+
1440 }
1441 }
1442
1443 as_os_str![
1444 String,
1445 &'_ str,
1446 std::path::PathBuf,
1447 &'_ std::path::Path,
1448 std::ffi::OsString,
1449 &'_ OsStr,
1450 ];
1451}
1452
1453pub(crate) type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;
1454
1455#[cfg(feature = "default-tls")]
1456mod native_tls_conn {
1457 use super::TlsInfoFactory;
1458 use hyper::rt::{Read, ReadBufCursor, Write};
1459 use hyper_tls::MaybeHttpsStream;
1460 use hyper_util::client::legacy::connect::{Connected, Connection};
1461 use hyper_util::rt::TokioIo;
1462 use pin_project_lite::pin_project;
1463 use std::{
1464 io::{self, IoSlice},
1465 pin::Pin,
1466 task::{Context, Poll},
1467 };
1468 use tokio::io::{AsyncRead, AsyncWrite};
1469 use tokio::net::TcpStream;
1470 use tokio_native_tls::TlsStream;
1471
1472 pin_project! {
1473 pub(super) struct NativeTlsConn<T> {
1474 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1475 }
1476 }
1477
1478 impl Connection for NativeTlsConn<TokioIo<TokioIo<TcpStream>>> {
1479 fn connected(&self) -> Connected {
1480 let connected = self
1481 .inner
1482 .inner()
1483 .get_ref()
1484 .get_ref()
1485 .get_ref()
1486 .inner()
1487 .connected();
1488 #[cfg(feature = "native-tls-alpn")]
1489 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1490 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1491 _ => connected,
1492 }
1493 #[cfg(not(feature = "native-tls-alpn"))]
1494 connected
1495 }
1496 }
1497
1498 impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1499 fn connected(&self) -> Connected {
1500 let connected = self
1501 .inner
1502 .inner()
1503 .get_ref()
1504 .get_ref()
1505 .get_ref()
1506 .inner()
1507 .connected();
1508 #[cfg(feature = "native-tls-alpn")]
1509 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1510 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1511 _ => connected,
1512 }
1513 #[cfg(not(feature = "native-tls-alpn"))]
1514 connected
1515 }
1516 }
1517
1518 #[cfg(unix)]
1519 impl Connection for NativeTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1520 fn connected(&self) -> Connected {
1521 let connected = Connected::new();
1522 #[cfg(feature = "native-tls-alpn")]
1523 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1524 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1525 _ => connected,
1526 }
1527 #[cfg(not(feature = "native-tls-alpn"))]
1528 connected
1529 }
1530 }
1531
1532 #[cfg(unix)]
1533 impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1534 fn connected(&self) -> Connected {
1535 let connected = Connected::new();
1536 #[cfg(feature = "native-tls-alpn")]
1537 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1538 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1539 _ => connected,
1540 }
1541 #[cfg(not(feature = "native-tls-alpn"))]
1542 connected
1543 }
1544 }
1545
1546 #[cfg(target_os = "windows")]
1547 impl Connection
1548 for NativeTlsConn<TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>
1549 {
1550 fn connected(&self) -> Connected {
1551 let connected = Connected::new();
1552 #[cfg(feature = "native-tls-alpn")]
1553 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1554 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1555 _ => connected,
1556 }
1557 #[cfg(not(feature = "native-tls-alpn"))]
1558 connected
1559 }
1560 }
1561
1562 #[cfg(target_os = "windows")]
1563 impl Connection
1564 for NativeTlsConn<
1565 TokioIo<MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>,
1566 >
1567 {
1568 fn connected(&self) -> Connected {
1569 let connected = Connected::new();
1570 #[cfg(feature = "native-tls-alpn")]
1571 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1572 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1573 _ => connected,
1574 }
1575 #[cfg(not(feature = "native-tls-alpn"))]
1576 connected
1577 }
1578 }
1579
1580 impl<T: AsyncRead + AsyncWrite + Unpin> Read for NativeTlsConn<T> {
1581 fn poll_read(
1582 self: Pin<&mut Self>,
1583 cx: &mut Context,
1584 buf: ReadBufCursor<'_>,
1585 ) -> Poll<tokio::io::Result<()>> {
1586 let this = self.project();
1587 Read::poll_read(this.inner, cx, buf)
1588 }
1589 }
1590
1591 impl<T: AsyncRead + AsyncWrite + Unpin> Write for NativeTlsConn<T> {
1592 fn poll_write(
1593 self: Pin<&mut Self>,
1594 cx: &mut Context,
1595 buf: &[u8],
1596 ) -> Poll<Result<usize, tokio::io::Error>> {
1597 let this = self.project();
1598 Write::poll_write(this.inner, cx, buf)
1599 }
1600
1601 fn poll_write_vectored(
1602 self: Pin<&mut Self>,
1603 cx: &mut Context<'_>,
1604 bufs: &[IoSlice<'_>],
1605 ) -> Poll<Result<usize, io::Error>> {
1606 let this = self.project();
1607 Write::poll_write_vectored(this.inner, cx, bufs)
1608 }
1609
1610 fn is_write_vectored(&self) -> bool {
1611 self.inner.is_write_vectored()
1612 }
1613
1614 fn poll_flush(
1615 self: Pin<&mut Self>,
1616 cx: &mut Context,
1617 ) -> Poll<Result<(), tokio::io::Error>> {
1618 let this = self.project();
1619 Write::poll_flush(this.inner, cx)
1620 }
1621
1622 fn poll_shutdown(
1623 self: Pin<&mut Self>,
1624 cx: &mut Context,
1625 ) -> Poll<Result<(), tokio::io::Error>> {
1626 let this = self.project();
1627 Write::poll_shutdown(this.inner, cx)
1628 }
1629 }
1630
1631 impl<T> TlsInfoFactory for NativeTlsConn<T>
1632 where
1633 TokioIo<TlsStream<T>>: TlsInfoFactory,
1634 {
1635 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1636 self.inner.tls_info()
1637 }
1638 }
1639}
1640
1641#[cfg(feature = "__rustls")]
1642mod rustls_tls_conn {
1643 use super::TlsInfoFactory;
1644 use hyper::rt::{Read, ReadBufCursor, Write};
1645 use hyper_rustls::MaybeHttpsStream;
1646 use hyper_util::client::legacy::connect::{Connected, Connection};
1647 use hyper_util::rt::TokioIo;
1648 use pin_project_lite::pin_project;
1649 use std::{
1650 io::{self, IoSlice},
1651 pin::Pin,
1652 task::{Context, Poll},
1653 };
1654 use tokio::io::{AsyncRead, AsyncWrite};
1655 use tokio::net::TcpStream;
1656 use tokio_rustls::client::TlsStream;
1657
1658 pin_project! {
1659 pub(super) struct RustlsTlsConn<T> {
1660 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1661 }
1662 }
1663
1664 impl Connection for RustlsTlsConn<TokioIo<TokioIo<TcpStream>>> {
1665 fn connected(&self) -> Connected {
1666 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1667 self.inner
1668 .inner()
1669 .get_ref()
1670 .0
1671 .inner()
1672 .connected()
1673 .negotiated_h2()
1674 } else {
1675 self.inner.inner().get_ref().0.inner().connected()
1676 }
1677 }
1678 }
1679 impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1680 fn connected(&self) -> Connected {
1681 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1682 self.inner
1683 .inner()
1684 .get_ref()
1685 .0
1686 .inner()
1687 .connected()
1688 .negotiated_h2()
1689 } else {
1690 self.inner.inner().get_ref().0.inner().connected()
1691 }
1692 }
1693 }
1694
1695 #[cfg(unix)]
1696 impl Connection for RustlsTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1697 fn connected(&self) -> Connected {
1698 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1699 self.inner
1700 .inner()
1701 .get_ref()
1702 .0
1703 .inner()
1704 .connected()
1705 .negotiated_h2()
1706 } else {
1707 self.inner.inner().get_ref().0.inner().connected()
1708 }
1709 }
1710 }
1711
1712 #[cfg(unix)]
1713 impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1714 fn connected(&self) -> Connected {
1715 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1716 self.inner
1717 .inner()
1718 .get_ref()
1719 .0
1720 .inner()
1721 .connected()
1722 .negotiated_h2()
1723 } else {
1724 self.inner.inner().get_ref().0.inner().connected()
1725 }
1726 }
1727 }
1728
1729 #[cfg(target_os = "windows")]
1730 impl Connection
1731 for RustlsTlsConn<TokioIo<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>
1732 {
1733 fn connected(&self) -> Connected {
1734 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1735 self.inner
1736 .inner()
1737 .get_ref()
1738 .0
1739 .inner()
1740 .connected()
1741 .negotiated_h2()
1742 } else {
1743 self.inner.inner().get_ref().0.inner().connected()
1744 }
1745 }
1746 }
1747
1748 #[cfg(target_os = "windows")]
1749 impl Connection
1750 for RustlsTlsConn<
1751 TokioIo<MaybeHttpsStream<TokioIo<tokio::net::windows::named_pipe::NamedPipeClient>>>,
1752 >
1753 {
1754 fn connected(&self) -> Connected {
1755 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1756 self.inner
1757 .inner()
1758 .get_ref()
1759 .0
1760 .inner()
1761 .connected()
1762 .negotiated_h2()
1763 } else {
1764 self.inner.inner().get_ref().0.inner().connected()
1765 }
1766 }
1767 }
1768
1769 impl<T: AsyncRead + AsyncWrite + Unpin> Read for RustlsTlsConn<T> {
1770 fn poll_read(
1771 self: Pin<&mut Self>,
1772 cx: &mut Context,
1773 buf: ReadBufCursor<'_>,
1774 ) -> Poll<tokio::io::Result<()>> {
1775 let this = self.project();
1776 Read::poll_read(this.inner, cx, buf)
1777 }
1778 }
1779
1780 impl<T: AsyncRead + AsyncWrite + Unpin> Write for RustlsTlsConn<T> {
1781 fn poll_write(
1782 self: Pin<&mut Self>,
1783 cx: &mut Context,
1784 buf: &[u8],
1785 ) -> Poll<Result<usize, tokio::io::Error>> {
1786 let this = self.project();
1787 Write::poll_write(this.inner, cx, buf)
1788 }
1789
1790 fn poll_write_vectored(
1791 self: Pin<&mut Self>,
1792 cx: &mut Context<'_>,
1793 bufs: &[IoSlice<'_>],
1794 ) -> Poll<Result<usize, io::Error>> {
1795 let this = self.project();
1796 Write::poll_write_vectored(this.inner, cx, bufs)
1797 }
1798
1799 fn is_write_vectored(&self) -> bool {
1800 self.inner.is_write_vectored()
1801 }
1802
1803 fn poll_flush(
1804 self: Pin<&mut Self>,
1805 cx: &mut Context,
1806 ) -> Poll<Result<(), tokio::io::Error>> {
1807 let this = self.project();
1808 Write::poll_flush(this.inner, cx)
1809 }
1810
1811 fn poll_shutdown(
1812 self: Pin<&mut Self>,
1813 cx: &mut Context,
1814 ) -> Poll<Result<(), tokio::io::Error>> {
1815 let this = self.project();
1816 Write::poll_shutdown(this.inner, cx)
1817 }
1818 }
1819 impl<T> TlsInfoFactory for RustlsTlsConn<T>
1820 where
1821 TokioIo<TlsStream<T>>: TlsInfoFactory,
1822 {
1823 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1824 self.inner.tls_info()
1825 }
1826 }
1827}
1828
1829#[cfg(feature = "socks")]
1830mod socks {
1831 use tower_service::Service;
1832
1833 use http::uri::Scheme;
1834 use http::Uri;
1835 use hyper_util::client::legacy::connect::proxy::{SocksV4, SocksV5};
1836 use tokio::net::TcpStream;
1837
1838 use super::BoxError;
1839 use crate::proxy::Intercepted;
1840
1841 pub(super) enum DnsResolve {
1842 Local,
1843 Proxy,
1844 }
1845
1846 #[derive(Debug)]
1847 pub(super) enum SocksProxyError {
1848 SocksNoHostInUrl,
1849 SocksLocalResolve(BoxError),
1850 SocksConnect(BoxError),
1851 }
1852
1853 pub(super) async fn connect(
1854 proxy: Intercepted,
1855 dst: Uri,
1856 dns_mode: DnsResolve,
1857 resolver: &crate::dns::DynResolver,
1858 http_connector: &mut crate::connect::HttpConnector,
1859 ) -> Result<TcpStream, SocksProxyError> {
1860 let https = dst.scheme() == Some(&Scheme::HTTPS);
1861 let original_host = dst.host().ok_or(SocksProxyError::SocksNoHostInUrl)?;
1862 let mut host = original_host.to_owned();
1863 let port = match dst.port() {
1864 Some(p) => p.as_u16(),
1865 None if https => 443u16,
1866 _ => 80u16,
1867 };
1868
1869 if let DnsResolve::Local = dns_mode {
1870 let maybe_new_target = resolver
1871 .http_resolve(&dst)
1872 .await
1873 .map_err(SocksProxyError::SocksLocalResolve)?
1874 .next();
1875 if let Some(new_target) = maybe_new_target {
1876 log::trace!("socks local dns resolved {new_target:?}");
1877 let ip = new_target.ip();
1879 if ip.is_ipv6() {
1880 host = format!("[{}]", ip);
1881 } else {
1882 host = ip.to_string();
1883 }
1884 }
1885 }
1886
1887 let proxy_uri = proxy.uri().clone();
1888 let dst_uri = format!(
1890 "{}://{}:{}",
1891 if https { "https" } else { "http" },
1892 host,
1893 port
1894 )
1895 .parse::<Uri>()
1896 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1897
1898 match proxy.uri().scheme_str() {
1900 Some("socks4") | Some("socks4a") => {
1901 let mut svc = SocksV4::new(proxy_uri, http_connector);
1902 let stream = Service::call(&mut svc, dst_uri)
1903 .await
1904 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1905 Ok(stream.into_inner())
1906 }
1907 Some("socks5") | Some("socks5h") => {
1908 let mut svc = if let Some((username, password)) = proxy.raw_auth() {
1909 SocksV5::new(proxy_uri, http_connector)
1910 .with_auth(username.to_string(), password.to_string())
1911 } else {
1912 SocksV5::new(proxy_uri, http_connector)
1913 };
1914 let stream = Service::call(&mut svc, dst_uri)
1915 .await
1916 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1917 Ok(stream.into_inner())
1918 }
1919 _ => unreachable!(),
1920 }
1921 }
1922
1923 impl std::fmt::Display for SocksProxyError {
1924 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1925 match self {
1926 Self::SocksNoHostInUrl => f.write_str("socks proxy destination has no host"),
1927 Self::SocksLocalResolve(_) => f.write_str("error resolving for socks proxy"),
1928 Self::SocksConnect(_) => f.write_str("error connecting to socks proxy"),
1929 }
1930 }
1931 }
1932
1933 impl std::error::Error for SocksProxyError {
1934 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1935 match self {
1936 Self::SocksNoHostInUrl => None,
1937 Self::SocksLocalResolve(ref e) => Some(&**e),
1938 Self::SocksConnect(ref e) => Some(&**e),
1939 }
1940 }
1941 }
1942}
1943
1944mod verbose {
1945 use crate::util::Escape;
1946 use hyper::rt::{Read, ReadBufCursor, Write};
1947 use hyper_util::client::legacy::connect::{Connected, Connection};
1948 use std::cmp::min;
1949 use std::fmt;
1950 use std::io::{self, IoSlice};
1951 use std::pin::Pin;
1952 use std::task::{Context, Poll};
1953
1954 pub(super) const OFF: Wrapper = Wrapper(false);
1955
1956 #[derive(Clone, Copy)]
1957 pub(super) struct Wrapper(pub(super) bool);
1958
1959 impl Wrapper {
1960 pub(super) fn wrap<T: super::AsyncConnWithInfo>(&self, conn: T) -> super::BoxConn {
1961 if self.0 && log::log_enabled!(log::Level::Trace) {
1962 Box::new(Verbose {
1963 id: crate::util::fast_random() as u32,
1965 inner: conn,
1966 })
1967 } else {
1968 Box::new(conn)
1969 }
1970 }
1971 }
1972
1973 struct Verbose<T> {
1974 id: u32,
1975 inner: T,
1976 }
1977
1978 impl<T: Connection + Read + Write + Unpin> Connection for Verbose<T> {
1979 fn connected(&self) -> Connected {
1980 self.inner.connected()
1981 }
1982 }
1983
1984 impl<T: Read + Write + Unpin> Read for Verbose<T> {
1985 fn poll_read(
1986 mut self: Pin<&mut Self>,
1987 cx: &mut Context,
1988 mut buf: ReadBufCursor<'_>,
1989 ) -> Poll<std::io::Result<()>> {
1990 let mut vbuf = hyper::rt::ReadBuf::uninit(unsafe { buf.as_mut() });
1994 match Pin::new(&mut self.inner).poll_read(cx, vbuf.unfilled()) {
1995 Poll::Ready(Ok(())) => {
1996 log::trace!("{:08x} read: {:?}", self.id, Escape::new(vbuf.filled()));
1997 let len = vbuf.filled().len();
1998 unsafe {
2001 buf.advance(len);
2002 }
2003 Poll::Ready(Ok(()))
2004 }
2005 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
2006 Poll::Pending => Poll::Pending,
2007 }
2008 }
2009 }
2010
2011 impl<T: Read + Write + Unpin> Write for Verbose<T> {
2012 fn poll_write(
2013 mut self: Pin<&mut Self>,
2014 cx: &mut Context,
2015 buf: &[u8],
2016 ) -> Poll<Result<usize, std::io::Error>> {
2017 match Pin::new(&mut self.inner).poll_write(cx, buf) {
2018 Poll::Ready(Ok(n)) => {
2019 log::trace!("{:08x} write: {:?}", self.id, Escape::new(&buf[..n]));
2020 Poll::Ready(Ok(n))
2021 }
2022 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
2023 Poll::Pending => Poll::Pending,
2024 }
2025 }
2026
2027 fn poll_write_vectored(
2028 mut self: Pin<&mut Self>,
2029 cx: &mut Context<'_>,
2030 bufs: &[IoSlice<'_>],
2031 ) -> Poll<Result<usize, io::Error>> {
2032 match Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) {
2033 Poll::Ready(Ok(nwritten)) => {
2034 log::trace!(
2035 "{:08x} write (vectored): {:?}",
2036 self.id,
2037 Vectored { bufs, nwritten }
2038 );
2039 Poll::Ready(Ok(nwritten))
2040 }
2041 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
2042 Poll::Pending => Poll::Pending,
2043 }
2044 }
2045
2046 fn is_write_vectored(&self) -> bool {
2047 self.inner.is_write_vectored()
2048 }
2049
2050 fn poll_flush(
2051 mut self: Pin<&mut Self>,
2052 cx: &mut Context,
2053 ) -> Poll<Result<(), std::io::Error>> {
2054 Pin::new(&mut self.inner).poll_flush(cx)
2055 }
2056
2057 fn poll_shutdown(
2058 mut self: Pin<&mut Self>,
2059 cx: &mut Context,
2060 ) -> Poll<Result<(), std::io::Error>> {
2061 Pin::new(&mut self.inner).poll_shutdown(cx)
2062 }
2063 }
2064
2065 #[cfg(feature = "__tls")]
2066 impl<T: super::TlsInfoFactory> super::TlsInfoFactory for Verbose<T> {
2067 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
2068 self.inner.tls_info()
2069 }
2070 }
2071
2072 struct Vectored<'a, 'b> {
2073 bufs: &'a [IoSlice<'b>],
2074 nwritten: usize,
2075 }
2076
2077 impl fmt::Debug for Vectored<'_, '_> {
2078 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2079 let mut left = self.nwritten;
2080 for buf in self.bufs.iter() {
2081 if left == 0 {
2082 break;
2083 }
2084 let n = min(left, buf.len());
2085 Escape::new(&buf[..n]).fmt(f)?;
2086 left -= n;
2087 }
2088 Ok(())
2089 }
2090 }
2091}