1#[cfg(feature = "__tls")]
2use http::header::HeaderValue;
3#[cfg(feature = "__tls")]
4use http::uri::Scheme;
5use http::Uri;
6use hyper::rt::{Read, ReadBufCursor, Write};
7use hyper_util::client::legacy::connect::{Connected, Connection};
8#[cfg(any(feature = "socks", feature = "__tls", unix))]
9use hyper_util::rt::TokioIo;
10#[cfg(feature = "default-tls")]
11use native_tls_crate::{TlsConnector, TlsConnectorBuilder};
12use pin_project_lite::pin_project;
13use tower::util::{BoxCloneSyncServiceLayer, MapRequestLayer};
14use tower::{timeout::TimeoutLayer, util::BoxCloneSyncService, ServiceBuilder};
15use tower_service::Service;
16
17use std::future::Future;
18use std::io::{self, IoSlice};
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25#[cfg(feature = "default-tls")]
26use self::native_tls_conn::NativeTlsConn;
27#[cfg(feature = "__rustls")]
28use self::rustls_tls_conn::RustlsTlsConn;
29use crate::dns::DynResolver;
30use crate::error::{cast_to_internal_error, BoxError};
31use crate::proxy::{Intercepted, Matcher as ProxyMatcher};
32use sealed::{Conn, Unnameable};
33
34pub(crate) type HttpConnector = hyper_util::client::legacy::connect::HttpConnector<DynResolver>;
35
36#[derive(Clone)]
37pub(crate) enum Connector {
38 Simple(ConnectorService),
40 WithLayers(BoxCloneSyncService<Unnameable, Conn, BoxError>),
43}
44
45impl Service<Uri> for Connector {
46 type Response = Conn;
47 type Error = BoxError;
48 type Future = Connecting;
49
50 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51 match self {
52 Connector::Simple(service) => service.poll_ready(cx),
53 Connector::WithLayers(service) => service.poll_ready(cx),
54 }
55 }
56
57 fn call(&mut self, dst: Uri) -> Self::Future {
58 match self {
59 Connector::Simple(service) => service.call(dst),
60 Connector::WithLayers(service) => service.call(Unnameable(dst)),
61 }
62 }
63}
64
65pub(crate) type BoxedConnectorService = BoxCloneSyncService<Unnameable, Conn, BoxError>;
66
67pub(crate) type BoxedConnectorLayer =
68 BoxCloneSyncServiceLayer<BoxedConnectorService, Unnameable, Conn, BoxError>;
69
70pub(crate) struct ConnectorBuilder {
71 inner: Inner,
72 proxies: Arc<Vec<ProxyMatcher>>,
73 verbose: verbose::Wrapper,
74 timeout: Option<Duration>,
75 #[cfg(feature = "__tls")]
76 nodelay: bool,
77 #[cfg(feature = "__tls")]
78 tls_info: bool,
79 #[cfg(feature = "__tls")]
80 user_agent: Option<HeaderValue>,
81 #[cfg(feature = "socks")]
82 resolver: Option<DynResolver>,
83 #[cfg(unix)]
84 unix_socket: Option<Arc<std::path::Path>>,
85}
86
87impl ConnectorBuilder {
88 pub(crate) fn build(self, layers: Vec<BoxedConnectorLayer>) -> Connector
89where {
90 let mut base_service = ConnectorService {
92 inner: self.inner,
93 proxies: self.proxies,
94 verbose: self.verbose,
95 #[cfg(feature = "__tls")]
96 nodelay: self.nodelay,
97 #[cfg(feature = "__tls")]
98 tls_info: self.tls_info,
99 #[cfg(feature = "__tls")]
100 user_agent: self.user_agent,
101 simple_timeout: None,
102 #[cfg(feature = "socks")]
103 resolver: self.resolver.unwrap_or_else(DynResolver::gai),
104 #[cfg(unix)]
105 unix_socket: self.unix_socket,
106 };
107
108 #[cfg(unix)]
109 if base_service.unix_socket.is_some() && !base_service.proxies.is_empty() {
110 base_service.proxies = Default::default();
111 log::trace!("unix_socket() set, proxies are ignored");
112 }
113
114 if layers.is_empty() {
115 base_service.simple_timeout = self.timeout;
117 return Connector::Simple(base_service);
118 }
119
120 let unnameable_service = ServiceBuilder::new()
124 .layer(MapRequestLayer::new(|request: Unnameable| request.0))
125 .service(base_service);
126 let mut service = BoxCloneSyncService::new(unnameable_service);
127
128 for layer in layers {
129 service = ServiceBuilder::new().layer(layer).service(service);
130 }
131
132 match self.timeout {
136 Some(timeout) => {
137 let service = ServiceBuilder::new()
138 .layer(TimeoutLayer::new(timeout))
139 .service(service);
140 let service = ServiceBuilder::new()
141 .map_err(|error: BoxError| cast_to_internal_error(error))
142 .service(service);
143 let service = BoxCloneSyncService::new(service);
144
145 Connector::WithLayers(service)
146 }
147 None => {
148 let service = ServiceBuilder::new().service(service);
152 let service = ServiceBuilder::new()
153 .map_err(|error: BoxError| cast_to_internal_error(error))
154 .service(service);
155 let service = BoxCloneSyncService::new(service);
156 Connector::WithLayers(service)
157 }
158 }
159 }
160
161 #[cfg(not(feature = "__tls"))]
162 pub(crate) fn new<T>(
163 mut http: HttpConnector,
164 proxies: Arc<Vec<ProxyMatcher>>,
165 local_addr: T,
166 #[cfg(any(
167 target_os = "android",
168 target_os = "fuchsia",
169 target_os = "illumos",
170 target_os = "ios",
171 target_os = "linux",
172 target_os = "macos",
173 target_os = "solaris",
174 target_os = "tvos",
175 target_os = "visionos",
176 target_os = "watchos",
177 ))]
178 interface: Option<&str>,
179 nodelay: bool,
180 ) -> ConnectorBuilder
181 where
182 T: Into<Option<IpAddr>>,
183 {
184 http.set_local_address(local_addr.into());
185 #[cfg(any(
186 target_os = "android",
187 target_os = "fuchsia",
188 target_os = "illumos",
189 target_os = "ios",
190 target_os = "linux",
191 target_os = "macos",
192 target_os = "solaris",
193 target_os = "tvos",
194 target_os = "visionos",
195 target_os = "watchos",
196 ))]
197 if let Some(interface) = interface {
198 http.set_interface(interface.to_owned());
199 }
200 http.set_nodelay(nodelay);
201
202 ConnectorBuilder {
203 inner: Inner::Http(http),
204 proxies,
205 verbose: verbose::OFF,
206 timeout: None,
207 #[cfg(feature = "socks")]
208 resolver: None,
209 #[cfg(unix)]
210 unix_socket: None,
211 }
212 }
213
214 #[cfg(feature = "default-tls")]
215 pub(crate) fn new_default_tls<T>(
216 http: HttpConnector,
217 tls: TlsConnectorBuilder,
218 proxies: Arc<Vec<ProxyMatcher>>,
219 user_agent: Option<HeaderValue>,
220 local_addr: T,
221 #[cfg(any(
222 target_os = "android",
223 target_os = "fuchsia",
224 target_os = "illumos",
225 target_os = "ios",
226 target_os = "linux",
227 target_os = "macos",
228 target_os = "solaris",
229 target_os = "tvos",
230 target_os = "visionos",
231 target_os = "watchos",
232 ))]
233 interface: Option<&str>,
234 nodelay: bool,
235 tls_info: bool,
236 ) -> crate::Result<ConnectorBuilder>
237 where
238 T: Into<Option<IpAddr>>,
239 {
240 let tls = tls.build().map_err(crate::error::builder)?;
241 Ok(Self::from_built_default_tls(
242 http,
243 tls,
244 proxies,
245 user_agent,
246 local_addr,
247 #[cfg(any(
248 target_os = "android",
249 target_os = "fuchsia",
250 target_os = "illumos",
251 target_os = "ios",
252 target_os = "linux",
253 target_os = "macos",
254 target_os = "solaris",
255 target_os = "tvos",
256 target_os = "visionos",
257 target_os = "watchos",
258 ))]
259 interface,
260 nodelay,
261 tls_info,
262 ))
263 }
264
265 #[cfg(feature = "default-tls")]
266 pub(crate) fn from_built_default_tls<T>(
267 mut http: HttpConnector,
268 tls: TlsConnector,
269 proxies: Arc<Vec<ProxyMatcher>>,
270 user_agent: Option<HeaderValue>,
271 local_addr: T,
272 #[cfg(any(
273 target_os = "android",
274 target_os = "fuchsia",
275 target_os = "illumos",
276 target_os = "ios",
277 target_os = "linux",
278 target_os = "macos",
279 target_os = "solaris",
280 target_os = "tvos",
281 target_os = "visionos",
282 target_os = "watchos",
283 ))]
284 interface: Option<&str>,
285 nodelay: bool,
286 tls_info: bool,
287 ) -> ConnectorBuilder
288 where
289 T: Into<Option<IpAddr>>,
290 {
291 http.set_local_address(local_addr.into());
292 #[cfg(any(
293 target_os = "android",
294 target_os = "fuchsia",
295 target_os = "illumos",
296 target_os = "ios",
297 target_os = "linux",
298 target_os = "macos",
299 target_os = "solaris",
300 target_os = "tvos",
301 target_os = "visionos",
302 target_os = "watchos",
303 ))]
304 if let Some(interface) = interface {
305 http.set_interface(interface);
306 }
307 http.set_nodelay(nodelay);
308 http.enforce_http(false);
309
310 ConnectorBuilder {
311 inner: Inner::DefaultTls(http, tls),
312 proxies,
313 verbose: verbose::OFF,
314 nodelay,
315 tls_info,
316 user_agent,
317 timeout: None,
318 #[cfg(feature = "socks")]
319 resolver: None,
320 #[cfg(unix)]
321 unix_socket: None,
322 }
323 }
324
325 #[cfg(feature = "__rustls")]
326 pub(crate) fn new_rustls_tls<T>(
327 mut http: HttpConnector,
328 tls: rustls::ClientConfig,
329 proxies: Arc<Vec<ProxyMatcher>>,
330 user_agent: Option<HeaderValue>,
331 local_addr: T,
332 #[cfg(any(
333 target_os = "android",
334 target_os = "fuchsia",
335 target_os = "illumos",
336 target_os = "ios",
337 target_os = "linux",
338 target_os = "macos",
339 target_os = "solaris",
340 target_os = "tvos",
341 target_os = "visionos",
342 target_os = "watchos",
343 ))]
344 interface: Option<&str>,
345 nodelay: bool,
346 tls_info: bool,
347 ) -> ConnectorBuilder
348 where
349 T: Into<Option<IpAddr>>,
350 {
351 http.set_local_address(local_addr.into());
352 #[cfg(any(
353 target_os = "android",
354 target_os = "fuchsia",
355 target_os = "illumos",
356 target_os = "ios",
357 target_os = "linux",
358 target_os = "macos",
359 target_os = "solaris",
360 target_os = "tvos",
361 target_os = "visionos",
362 target_os = "watchos",
363 ))]
364 if let Some(interface) = interface {
365 http.set_interface(interface.to_owned());
366 }
367 http.set_nodelay(nodelay);
368 http.enforce_http(false);
369
370 let (tls, tls_proxy) = if proxies.is_empty() {
371 let tls = Arc::new(tls);
372 (tls.clone(), tls)
373 } else {
374 let mut tls_proxy = tls.clone();
375 tls_proxy.alpn_protocols.clear();
376 (Arc::new(tls), Arc::new(tls_proxy))
377 };
378
379 ConnectorBuilder {
380 inner: Inner::RustlsTls {
381 http,
382 tls,
383 tls_proxy,
384 },
385 proxies,
386 verbose: verbose::OFF,
387 nodelay,
388 tls_info,
389 user_agent,
390 timeout: None,
391 #[cfg(feature = "socks")]
392 resolver: None,
393 #[cfg(unix)]
394 unix_socket: None,
395 }
396 }
397
398 pub(crate) fn set_timeout(&mut self, timeout: Option<Duration>) {
399 self.timeout = timeout;
400 }
401
402 pub(crate) fn set_verbose(&mut self, enabled: bool) {
403 self.verbose.0 = enabled;
404 }
405
406 pub(crate) fn set_keepalive(&mut self, dur: Option<Duration>) {
407 match &mut self.inner {
408 #[cfg(feature = "default-tls")]
409 Inner::DefaultTls(http, _tls) => http.set_keepalive(dur),
410 #[cfg(feature = "__rustls")]
411 Inner::RustlsTls { http, .. } => http.set_keepalive(dur),
412 #[cfg(not(feature = "__tls"))]
413 Inner::Http(http) => http.set_keepalive(dur),
414 }
415 }
416
417 pub(crate) fn set_keepalive_interval(&mut self, dur: Option<Duration>) {
418 match &mut self.inner {
419 #[cfg(feature = "default-tls")]
420 Inner::DefaultTls(http, _tls) => http.set_keepalive_interval(dur),
421 #[cfg(feature = "__rustls")]
422 Inner::RustlsTls { http, .. } => http.set_keepalive_interval(dur),
423 #[cfg(not(feature = "__tls"))]
424 Inner::Http(http) => http.set_keepalive_interval(dur),
425 }
426 }
427
428 pub(crate) fn set_keepalive_retries(&mut self, retries: Option<u32>) {
429 match &mut self.inner {
430 #[cfg(feature = "default-tls")]
431 Inner::DefaultTls(http, _tls) => http.set_keepalive_retries(retries),
432 #[cfg(feature = "__rustls")]
433 Inner::RustlsTls { http, .. } => http.set_keepalive_retries(retries),
434 #[cfg(not(feature = "__tls"))]
435 Inner::Http(http) => http.set_keepalive_retries(retries),
436 }
437 }
438
439 #[cfg(feature = "socks")]
440 pub(crate) fn set_socks_resolver(&mut self, resolver: DynResolver) {
441 self.resolver = Some(resolver);
442 }
443
444 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
445 pub(crate) fn set_tcp_user_timeout(&mut self, dur: Option<Duration>) {
446 match &mut self.inner {
447 #[cfg(feature = "default-tls")]
448 Inner::DefaultTls(http, _tls) => http.set_tcp_user_timeout(dur),
449 #[cfg(feature = "__rustls")]
450 Inner::RustlsTls { http, .. } => http.set_tcp_user_timeout(dur),
451 #[cfg(not(feature = "__tls"))]
452 Inner::Http(http) => http.set_tcp_user_timeout(dur),
453 }
454 }
455
456 #[cfg(unix)]
457 pub(crate) fn set_unix_socket(&mut self, path: Option<Arc<std::path::Path>>) {
458 self.unix_socket = path;
459 }
460}
461
462#[allow(missing_debug_implementations)]
463#[derive(Clone)]
464pub(crate) struct ConnectorService {
465 inner: Inner,
466 proxies: Arc<Vec<ProxyMatcher>>,
467 verbose: verbose::Wrapper,
468 simple_timeout: Option<Duration>,
473 #[cfg(feature = "__tls")]
474 nodelay: bool,
475 #[cfg(feature = "__tls")]
476 tls_info: bool,
477 #[cfg(feature = "__tls")]
478 user_agent: Option<HeaderValue>,
479 #[cfg(feature = "socks")]
480 resolver: DynResolver,
481 #[cfg(unix)]
483 unix_socket: Option<Arc<std::path::Path>>,
484}
485
486#[derive(Clone)]
487enum Inner {
488 #[cfg(not(feature = "__tls"))]
489 Http(HttpConnector),
490 #[cfg(feature = "default-tls")]
491 DefaultTls(HttpConnector, TlsConnector),
492 #[cfg(feature = "__rustls")]
493 RustlsTls {
494 http: HttpConnector,
495 tls: Arc<rustls::ClientConfig>,
496 tls_proxy: Arc<rustls::ClientConfig>,
497 },
498}
499
500impl Inner {
501 #[cfg(feature = "socks")]
502 fn get_http_connector(&mut self) -> &mut crate::connect::HttpConnector {
503 match self {
504 #[cfg(feature = "default-tls")]
505 Inner::DefaultTls(http, _) => http,
506 #[cfg(feature = "__rustls")]
507 Inner::RustlsTls { http, .. } => http,
508 #[cfg(not(feature = "__tls"))]
509 Inner::Http(http) => http,
510 }
511 }
512}
513
514impl ConnectorService {
515 #[cfg(feature = "socks")]
516 async fn connect_socks(mut self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
517 let dns = match proxy.uri().scheme_str() {
518 Some("socks4") | Some("socks5") => socks::DnsResolve::Local,
519 Some("socks4a") | Some("socks5h") => socks::DnsResolve::Proxy,
520 _ => {
521 unreachable!("connect_socks is only called for socks proxies");
522 }
523 };
524
525 match &mut self.inner {
526 #[cfg(feature = "default-tls")]
527 Inner::DefaultTls(http, tls) => {
528 if dst.scheme() == Some(&Scheme::HTTPS) {
529 let host = dst.host().ok_or("no host in url")?.to_string();
530 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
531 let conn = TokioIo::new(conn);
532 let conn = TokioIo::new(conn);
533 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
534 let io = tls_connector.connect(&host, conn).await?;
535 let io = TokioIo::new(io);
536 return Ok(Conn {
537 inner: self.verbose.wrap(NativeTlsConn { inner: io }),
538 is_proxy: false,
539 tls_info: self.tls_info,
540 });
541 }
542 }
543 #[cfg(feature = "__rustls")]
544 Inner::RustlsTls { http, tls, .. } => {
545 if dst.scheme() == Some(&Scheme::HTTPS) {
546 use std::convert::TryFrom;
547 use tokio_rustls::TlsConnector as RustlsConnector;
548
549 let tls = tls.clone();
550 let host = dst.host().ok_or("no host in url")?.to_string();
551 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
552 let conn = TokioIo::new(conn);
553 let conn = TokioIo::new(conn);
554 let server_name =
555 rustls_pki_types::ServerName::try_from(host.as_str().to_owned())
556 .map_err(|_| "Invalid Server Name")?;
557 let io = RustlsConnector::from(tls)
558 .connect(server_name, conn)
559 .await?;
560 let io = TokioIo::new(io);
561 return Ok(Conn {
562 inner: self.verbose.wrap(RustlsTlsConn { inner: io }),
563 is_proxy: false,
564 tls_info: false,
565 });
566 }
567 }
568 #[cfg(not(feature = "__tls"))]
569 Inner::Http(http) => {
570 let conn = socks::connect(proxy, dst, dns, &self.resolver, http).await?;
571 return Ok(Conn {
572 inner: self.verbose.wrap(TokioIo::new(conn)),
573 is_proxy: false,
574 tls_info: false,
575 });
576 }
577 }
578
579 let resolver = &self.resolver;
580 let http = self.inner.get_http_connector();
581 socks::connect(proxy, dst, dns, resolver, http)
582 .await
583 .map(|tcp| Conn {
584 inner: self.verbose.wrap(TokioIo::new(tcp)),
585 is_proxy: false,
586 tls_info: false,
587 })
588 .map_err(Into::into)
589 }
590
591 async fn connect_with_maybe_proxy(self, dst: Uri, is_proxy: bool) -> Result<Conn, BoxError> {
592 match self.inner {
593 #[cfg(not(feature = "__tls"))]
594 Inner::Http(mut http) => {
595 let io = http.call(dst).await?;
596 Ok(Conn {
597 inner: self.verbose.wrap(io),
598 is_proxy,
599 tls_info: false,
600 })
601 }
602 #[cfg(feature = "default-tls")]
603 Inner::DefaultTls(http, tls) => {
604 let mut http = http.clone();
605
606 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
610 http.set_nodelay(true);
611 }
612
613 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
614 let mut http = hyper_tls::HttpsConnector::from((http, tls_connector));
615 let io = http.call(dst).await?;
616
617 if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
618 if !self.nodelay {
619 stream
620 .inner()
621 .get_ref()
622 .get_ref()
623 .get_ref()
624 .inner()
625 .inner()
626 .set_nodelay(false)?;
627 }
628 Ok(Conn {
629 inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
630 is_proxy,
631 tls_info: self.tls_info,
632 })
633 } else {
634 Ok(Conn {
635 inner: self.verbose.wrap(io),
636 is_proxy,
637 tls_info: false,
638 })
639 }
640 }
641 #[cfg(feature = "__rustls")]
642 Inner::RustlsTls { http, tls, .. } => {
643 let mut http = http.clone();
644
645 if !self.nodelay && (dst.scheme() == Some(&Scheme::HTTPS)) {
649 http.set_nodelay(true);
650 }
651
652 let mut http = hyper_rustls::HttpsConnector::from((http, tls.clone()));
653 let io = http.call(dst).await?;
654
655 if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
656 if !self.nodelay {
657 let (io, _) = stream.inner().get_ref();
658 io.inner().inner().set_nodelay(false)?;
659 }
660 Ok(Conn {
661 inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
662 is_proxy,
663 tls_info: self.tls_info,
664 })
665 } else {
666 Ok(Conn {
667 inner: self.verbose.wrap(io),
668 is_proxy,
669 tls_info: false,
670 })
671 }
672 }
673 }
674 }
675
676 #[cfg(unix)]
678 async fn connect_local_transport(self, dst: Uri) -> Result<Conn, BoxError> {
679 let path = self
680 .unix_socket
681 .as_ref()
682 .expect("connect local must have socket path")
683 .clone();
684 let svc = tower::service_fn(move |_| {
685 let fut = tokio::net::UnixStream::connect(path.clone());
686 async move {
687 let io = fut.await?;
688 Ok::<_, std::io::Error>(TokioIo::new(io))
689 }
690 });
691 let is_proxy = false;
692 match self.inner {
693 #[cfg(not(feature = "__tls"))]
694 Inner::Http(..) => {
695 let mut svc = svc;
696 let io = svc.call(dst).await?;
697 Ok(Conn {
698 inner: self.verbose.wrap(io),
699 is_proxy,
700 tls_info: false,
701 })
702 }
703 #[cfg(feature = "default-tls")]
704 Inner::DefaultTls(_, tls) => {
705 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
706 let mut http = hyper_tls::HttpsConnector::from((svc, tls_connector));
707 let io = http.call(dst).await?;
708
709 if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
710 Ok(Conn {
711 inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
712 is_proxy,
713 tls_info: self.tls_info,
714 })
715 } else {
716 Ok(Conn {
717 inner: self.verbose.wrap(io),
718 is_proxy,
719 tls_info: false,
720 })
721 }
722 }
723 #[cfg(feature = "__rustls")]
724 Inner::RustlsTls { tls, .. } => {
725 let mut http = hyper_rustls::HttpsConnector::from((svc, tls.clone()));
726 let io = http.call(dst).await?;
727
728 if let hyper_rustls::MaybeHttpsStream::Https(stream) = io {
729 Ok(Conn {
730 inner: self.verbose.wrap(RustlsTlsConn { inner: stream }),
731 is_proxy,
732 tls_info: self.tls_info,
733 })
734 } else {
735 Ok(Conn {
736 inner: self.verbose.wrap(io),
737 is_proxy,
738 tls_info: false,
739 })
740 }
741 }
742 }
743 }
744
745 async fn connect_via_proxy(self, dst: Uri, proxy: Intercepted) -> Result<Conn, BoxError> {
746 log::debug!("proxy({proxy:?}) intercepts '{dst:?}'");
747
748 #[cfg(feature = "socks")]
749 match proxy.uri().scheme_str().ok_or("proxy scheme expected")? {
750 "socks4" | "socks4a" | "socks5" | "socks5h" => {
751 return self.connect_socks(dst, proxy).await
752 }
753 _ => (),
754 }
755
756 let proxy_dst = proxy.uri().clone();
757 #[cfg(feature = "__tls")]
758 let auth = proxy.basic_auth().cloned();
759
760 #[cfg(feature = "__tls")]
761 let misc = proxy.custom_headers().clone();
762
763 match &self.inner {
764 #[cfg(feature = "default-tls")]
765 Inner::DefaultTls(http, tls) => {
766 if dst.scheme() == Some(&Scheme::HTTPS) {
767 log::trace!("tunneling HTTPS over proxy");
768 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
769 let inner =
770 hyper_tls::HttpsConnector::from((http.clone(), tls_connector.clone()));
771 let mut tunnel =
773 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
774 if let Some(auth) = auth {
775 tunnel = tunnel.with_auth(auth);
776 }
777 if let Some(ua) = self.user_agent {
778 let mut headers = http::HeaderMap::new();
779 headers.insert(http::header::USER_AGENT, ua);
780 tunnel = tunnel.with_headers(headers);
781 }
782 if let Some(custom_headers) = misc {
784 tunnel = tunnel.with_headers(custom_headers.clone());
785 }
786 let tunneled = tunnel.call(dst.clone()).await?;
789 let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
790 let io = tls_connector
791 .connect(dst.host().ok_or("no host in url")?, TokioIo::new(tunneled))
792 .await?;
793 return Ok(Conn {
794 inner: self.verbose.wrap(NativeTlsConn {
795 inner: TokioIo::new(io),
796 }),
797 is_proxy: false,
798 tls_info: false,
799 });
800 }
801 }
802 #[cfg(feature = "__rustls")]
803 Inner::RustlsTls {
804 http,
805 tls,
806 tls_proxy,
807 } => {
808 if dst.scheme() == Some(&Scheme::HTTPS) {
809 use rustls_pki_types::ServerName;
810 use std::convert::TryFrom;
811 use tokio_rustls::TlsConnector as RustlsConnector;
812
813 log::trace!("tunneling HTTPS over proxy");
814 let http = http.clone();
815 let inner = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
816 let mut tunnel =
818 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
819 if let Some(auth) = auth {
820 tunnel = tunnel.with_auth(auth);
821 }
822 if let Some(custom_headers) = misc {
823 tunnel = tunnel.with_headers(custom_headers.clone());
824 }
825 if let Some(ua) = self.user_agent {
826 let mut headers = http::HeaderMap::new();
827 headers.insert(http::header::USER_AGENT, ua);
828 tunnel = tunnel.with_headers(headers);
829 }
830 let tunneled = tunnel.call(dst.clone()).await?;
833 let host = dst.host().ok_or("no host in url")?.to_string();
834 let server_name = ServerName::try_from(host.as_str().to_owned())
835 .map_err(|_| "Invalid Server Name")?;
836 let io = RustlsConnector::from(tls.clone())
837 .connect(server_name, TokioIo::new(tunneled))
838 .await?;
839
840 return Ok(Conn {
841 inner: self.verbose.wrap(RustlsTlsConn {
842 inner: TokioIo::new(io),
843 }),
844 is_proxy: false,
845 tls_info: false,
846 });
847 }
848 }
849 #[cfg(not(feature = "__tls"))]
850 Inner::Http(_) => (),
851 }
852
853 self.connect_with_maybe_proxy(proxy_dst, true).await
854 }
855}
856
857async fn with_timeout<T, F>(f: F, timeout: Option<Duration>) -> Result<T, BoxError>
858where
859 F: Future<Output = Result<T, BoxError>>,
860{
861 if let Some(to) = timeout {
862 match tokio::time::timeout(to, f).await {
863 Err(_elapsed) => Err(Box::new(crate::error::TimedOut) as BoxError),
864 Ok(Ok(try_res)) => Ok(try_res),
865 Ok(Err(e)) => Err(e),
866 }
867 } else {
868 f.await
869 }
870}
871
872impl Service<Uri> for ConnectorService {
873 type Response = Conn;
874 type Error = BoxError;
875 type Future = Connecting;
876
877 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
878 Poll::Ready(Ok(()))
879 }
880
881 fn call(&mut self, dst: Uri) -> Self::Future {
882 log::debug!("starting new connection: {dst:?}");
883 let timeout = self.simple_timeout;
884
885 #[cfg(unix)]
887 if self.unix_socket.is_some() {
888 return Box::pin(with_timeout(
889 self.clone().connect_local_transport(dst),
890 timeout,
891 ));
892 }
893
894 for prox in self.proxies.iter() {
895 if let Some(intercepted) = prox.intercept(&dst) {
896 return Box::pin(with_timeout(
897 self.clone().connect_via_proxy(dst, intercepted),
898 timeout,
899 ));
900 }
901 }
902
903 Box::pin(with_timeout(
904 self.clone().connect_with_maybe_proxy(dst, false),
905 timeout,
906 ))
907 }
908}
909
910#[cfg(feature = "__tls")]
911trait TlsInfoFactory {
912 fn tls_info(&self) -> Option<crate::tls::TlsInfo>;
913}
914
915#[cfg(feature = "__tls")]
916impl<T: TlsInfoFactory> TlsInfoFactory for TokioIo<T> {
917 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
918 self.inner().tls_info()
919 }
920}
921
922#[cfg(feature = "__tls")]
925impl TlsInfoFactory for tokio::net::TcpStream {
926 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
927 None
928 }
929}
930
931#[cfg(feature = "default-tls")]
932impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
933 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
934 let peer_certificate = self
935 .get_ref()
936 .peer_certificate()
937 .ok()
938 .flatten()
939 .and_then(|c| c.to_der().ok());
940 Some(crate::tls::TlsInfo { peer_certificate })
941 }
942}
943
944#[cfg(feature = "default-tls")]
945impl TlsInfoFactory
946 for tokio_native_tls::TlsStream<
947 TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
948 >
949{
950 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
951 let peer_certificate = self
952 .get_ref()
953 .peer_certificate()
954 .ok()
955 .flatten()
956 .and_then(|c| c.to_der().ok());
957 Some(crate::tls::TlsInfo { peer_certificate })
958 }
959}
960
961#[cfg(feature = "default-tls")]
962impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
963 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
964 match self {
965 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
966 hyper_tls::MaybeHttpsStream::Http(_) => None,
967 }
968 }
969}
970
971#[cfg(feature = "__rustls")]
972impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::TcpStream>>> {
973 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
974 let peer_certificate = self
975 .get_ref()
976 .1
977 .peer_certificates()
978 .and_then(|certs| certs.first())
979 .map(|c| c.to_vec());
980 Some(crate::tls::TlsInfo { peer_certificate })
981 }
982}
983
984#[cfg(feature = "__rustls")]
985impl TlsInfoFactory
986 for tokio_rustls::client::TlsStream<
987 TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
988 >
989{
990 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
991 let peer_certificate = self
992 .get_ref()
993 .1
994 .peer_certificates()
995 .and_then(|certs| certs.first())
996 .map(|c| c.to_vec());
997 Some(crate::tls::TlsInfo { peer_certificate })
998 }
999}
1000
1001#[cfg(feature = "__rustls")]
1002impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>> {
1003 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1004 match self {
1005 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1006 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1007 }
1008 }
1009}
1010
1011#[cfg(feature = "__tls")]
1014#[cfg(unix)]
1015impl TlsInfoFactory for tokio::net::UnixStream {
1016 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1017 None
1018 }
1019}
1020
1021#[cfg(feature = "default-tls")]
1022#[cfg(unix)]
1023impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1024 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1025 let peer_certificate = self
1026 .get_ref()
1027 .peer_certificate()
1028 .ok()
1029 .flatten()
1030 .and_then(|c| c.to_der().ok());
1031 Some(crate::tls::TlsInfo { peer_certificate })
1032 }
1033}
1034
1035#[cfg(feature = "default-tls")]
1036#[cfg(unix)]
1037impl TlsInfoFactory
1038 for tokio_native_tls::TlsStream<
1039 TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1040 >
1041{
1042 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1043 let peer_certificate = self
1044 .get_ref()
1045 .peer_certificate()
1046 .ok()
1047 .flatten()
1048 .and_then(|c| c.to_der().ok());
1049 Some(crate::tls::TlsInfo { peer_certificate })
1050 }
1051}
1052
1053#[cfg(feature = "default-tls")]
1054#[cfg(unix)]
1055impl TlsInfoFactory for hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1056 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1057 match self {
1058 hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1059 hyper_tls::MaybeHttpsStream::Http(_) => None,
1060 }
1061 }
1062}
1063
1064#[cfg(feature = "__rustls")]
1065#[cfg(unix)]
1066impl TlsInfoFactory for tokio_rustls::client::TlsStream<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1067 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1068 let peer_certificate = self
1069 .get_ref()
1070 .1
1071 .peer_certificates()
1072 .and_then(|certs| certs.first())
1073 .map(|c| c.to_vec());
1074 Some(crate::tls::TlsInfo { peer_certificate })
1075 }
1076}
1077
1078#[cfg(feature = "__rustls")]
1079#[cfg(unix)]
1080impl TlsInfoFactory
1081 for tokio_rustls::client::TlsStream<
1082 TokioIo<hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>,
1083 >
1084{
1085 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1086 let peer_certificate = self
1087 .get_ref()
1088 .1
1089 .peer_certificates()
1090 .and_then(|certs| certs.first())
1091 .map(|c| c.to_vec());
1092 Some(crate::tls::TlsInfo { peer_certificate })
1093 }
1094}
1095
1096#[cfg(feature = "__rustls")]
1097#[cfg(unix)]
1098impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream<TokioIo<tokio::net::UnixStream>> {
1099 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1100 match self {
1101 hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(),
1102 hyper_rustls::MaybeHttpsStream::Http(_) => None,
1103 }
1104 }
1105}
1106
1107pub(crate) trait AsyncConn:
1108 Read + Write + Connection + Send + Sync + Unpin + 'static
1109{
1110}
1111
1112impl<T: Read + Write + Connection + Send + Sync + Unpin + 'static> AsyncConn for T {}
1113
1114#[cfg(feature = "__tls")]
1115trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {}
1116#[cfg(not(feature = "__tls"))]
1117trait AsyncConnWithInfo: AsyncConn {}
1118
1119#[cfg(feature = "__tls")]
1120impl<T: AsyncConn + TlsInfoFactory> AsyncConnWithInfo for T {}
1121#[cfg(not(feature = "__tls"))]
1122impl<T: AsyncConn> AsyncConnWithInfo for T {}
1123
1124type BoxConn = Box<dyn AsyncConnWithInfo>;
1125
1126pub(crate) mod sealed {
1127 use super::*;
1128 #[derive(Debug)]
1129 pub struct Unnameable(pub(super) Uri);
1130
1131 pin_project! {
1132 #[allow(missing_debug_implementations)]
1137 pub struct Conn {
1138 #[pin]
1139 pub(super)inner: BoxConn,
1140 pub(super) is_proxy: bool,
1141 pub(super) tls_info: bool,
1143 }
1144 }
1145
1146 impl Connection for Conn {
1147 fn connected(&self) -> Connected {
1148 let connected = self.inner.connected().proxy(self.is_proxy);
1149 #[cfg(feature = "__tls")]
1150 if self.tls_info {
1151 if let Some(tls_info) = self.inner.tls_info() {
1152 connected.extra(tls_info)
1153 } else {
1154 connected
1155 }
1156 } else {
1157 connected
1158 }
1159 #[cfg(not(feature = "__tls"))]
1160 connected
1161 }
1162 }
1163
1164 impl Read for Conn {
1165 fn poll_read(
1166 self: Pin<&mut Self>,
1167 cx: &mut Context,
1168 buf: ReadBufCursor<'_>,
1169 ) -> Poll<io::Result<()>> {
1170 let this = self.project();
1171 Read::poll_read(this.inner, cx, buf)
1172 }
1173 }
1174
1175 impl Write for Conn {
1176 fn poll_write(
1177 self: Pin<&mut Self>,
1178 cx: &mut Context,
1179 buf: &[u8],
1180 ) -> Poll<Result<usize, io::Error>> {
1181 let this = self.project();
1182 Write::poll_write(this.inner, cx, buf)
1183 }
1184
1185 fn poll_write_vectored(
1186 self: Pin<&mut Self>,
1187 cx: &mut Context<'_>,
1188 bufs: &[IoSlice<'_>],
1189 ) -> Poll<Result<usize, io::Error>> {
1190 let this = self.project();
1191 Write::poll_write_vectored(this.inner, cx, bufs)
1192 }
1193
1194 fn is_write_vectored(&self) -> bool {
1195 self.inner.is_write_vectored()
1196 }
1197
1198 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1199 let this = self.project();
1200 Write::poll_flush(this.inner, cx)
1201 }
1202
1203 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
1204 let this = self.project();
1205 Write::poll_shutdown(this.inner, cx)
1206 }
1207 }
1208}
1209
1210#[cfg(unix)]
1212pub(crate) mod uds {
1213 use std::path::Path;
1214
1215 #[cfg(unix)]
1222 pub trait UnixSocketProvider {
1223 #[doc(hidden)]
1224 fn reqwest_uds_path(&self, _: Internal) -> &Path;
1225 }
1226
1227 #[allow(missing_debug_implementations)]
1228 pub struct Internal;
1229
1230 macro_rules! as_path {
1231 ($($t:ty,)+) => {
1232 $(
1233 impl UnixSocketProvider for $t {
1234 #[doc(hidden)]
1235 fn reqwest_uds_path(&self, _: Internal) -> &Path {
1236 self.as_ref()
1237 }
1238 }
1239 )+
1240 }
1241 }
1242
1243 as_path![
1244 String,
1245 &'_ str,
1246 &'_ Path,
1247 std::path::PathBuf,
1248 std::sync::Arc<Path>,
1249 ];
1250}
1251
1252pub(crate) type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;
1253
1254#[cfg(feature = "default-tls")]
1255mod native_tls_conn {
1256 use super::TlsInfoFactory;
1257 use hyper::rt::{Read, ReadBufCursor, Write};
1258 use hyper_tls::MaybeHttpsStream;
1259 use hyper_util::client::legacy::connect::{Connected, Connection};
1260 use hyper_util::rt::TokioIo;
1261 use pin_project_lite::pin_project;
1262 use std::{
1263 io::{self, IoSlice},
1264 pin::Pin,
1265 task::{Context, Poll},
1266 };
1267 use tokio::io::{AsyncRead, AsyncWrite};
1268 use tokio::net::TcpStream;
1269 use tokio_native_tls::TlsStream;
1270
1271 pin_project! {
1272 pub(super) struct NativeTlsConn<T> {
1273 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1274 }
1275 }
1276
1277 impl Connection for NativeTlsConn<TokioIo<TokioIo<TcpStream>>> {
1278 fn connected(&self) -> Connected {
1279 let connected = self
1280 .inner
1281 .inner()
1282 .get_ref()
1283 .get_ref()
1284 .get_ref()
1285 .inner()
1286 .connected();
1287 #[cfg(feature = "native-tls-alpn")]
1288 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1289 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1290 _ => connected,
1291 }
1292 #[cfg(not(feature = "native-tls-alpn"))]
1293 connected
1294 }
1295 }
1296
1297 impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1298 fn connected(&self) -> Connected {
1299 let connected = self
1300 .inner
1301 .inner()
1302 .get_ref()
1303 .get_ref()
1304 .get_ref()
1305 .inner()
1306 .connected();
1307 #[cfg(feature = "native-tls-alpn")]
1308 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1309 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1310 _ => connected,
1311 }
1312 #[cfg(not(feature = "native-tls-alpn"))]
1313 connected
1314 }
1315 }
1316
1317 #[cfg(unix)]
1318 impl Connection for NativeTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1319 fn connected(&self) -> Connected {
1320 let connected = Connected::new();
1321 #[cfg(feature = "native-tls-alpn")]
1322 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1323 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1324 _ => connected,
1325 }
1326 #[cfg(not(feature = "native-tls-alpn"))]
1327 connected
1328 }
1329 }
1330
1331 #[cfg(unix)]
1332 impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1333 fn connected(&self) -> Connected {
1334 let connected = Connected::new();
1335 #[cfg(feature = "native-tls-alpn")]
1336 match self.inner.inner().get_ref().negotiated_alpn().ok() {
1337 Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => connected.negotiated_h2(),
1338 _ => connected,
1339 }
1340 #[cfg(not(feature = "native-tls-alpn"))]
1341 connected
1342 }
1343 }
1344
1345 impl<T: AsyncRead + AsyncWrite + Unpin> Read for NativeTlsConn<T> {
1346 fn poll_read(
1347 self: Pin<&mut Self>,
1348 cx: &mut Context,
1349 buf: ReadBufCursor<'_>,
1350 ) -> Poll<tokio::io::Result<()>> {
1351 let this = self.project();
1352 Read::poll_read(this.inner, cx, buf)
1353 }
1354 }
1355
1356 impl<T: AsyncRead + AsyncWrite + Unpin> Write for NativeTlsConn<T> {
1357 fn poll_write(
1358 self: Pin<&mut Self>,
1359 cx: &mut Context,
1360 buf: &[u8],
1361 ) -> Poll<Result<usize, tokio::io::Error>> {
1362 let this = self.project();
1363 Write::poll_write(this.inner, cx, buf)
1364 }
1365
1366 fn poll_write_vectored(
1367 self: Pin<&mut Self>,
1368 cx: &mut Context<'_>,
1369 bufs: &[IoSlice<'_>],
1370 ) -> Poll<Result<usize, io::Error>> {
1371 let this = self.project();
1372 Write::poll_write_vectored(this.inner, cx, bufs)
1373 }
1374
1375 fn is_write_vectored(&self) -> bool {
1376 self.inner.is_write_vectored()
1377 }
1378
1379 fn poll_flush(
1380 self: Pin<&mut Self>,
1381 cx: &mut Context,
1382 ) -> Poll<Result<(), tokio::io::Error>> {
1383 let this = self.project();
1384 Write::poll_flush(this.inner, cx)
1385 }
1386
1387 fn poll_shutdown(
1388 self: Pin<&mut Self>,
1389 cx: &mut Context,
1390 ) -> Poll<Result<(), tokio::io::Error>> {
1391 let this = self.project();
1392 Write::poll_shutdown(this.inner, cx)
1393 }
1394 }
1395
1396 impl<T> TlsInfoFactory for NativeTlsConn<T>
1397 where
1398 TokioIo<TlsStream<T>>: TlsInfoFactory,
1399 {
1400 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1401 self.inner.tls_info()
1402 }
1403 }
1404}
1405
1406#[cfg(feature = "__rustls")]
1407mod rustls_tls_conn {
1408 use super::TlsInfoFactory;
1409 use hyper::rt::{Read, ReadBufCursor, Write};
1410 use hyper_rustls::MaybeHttpsStream;
1411 use hyper_util::client::legacy::connect::{Connected, Connection};
1412 use hyper_util::rt::TokioIo;
1413 use pin_project_lite::pin_project;
1414 use std::{
1415 io::{self, IoSlice},
1416 pin::Pin,
1417 task::{Context, Poll},
1418 };
1419 use tokio::io::{AsyncRead, AsyncWrite};
1420 use tokio::net::TcpStream;
1421 use tokio_rustls::client::TlsStream;
1422
1423 pin_project! {
1424 pub(super) struct RustlsTlsConn<T> {
1425 #[pin] pub(super) inner: TokioIo<TlsStream<T>>,
1426 }
1427 }
1428
1429 impl Connection for RustlsTlsConn<TokioIo<TokioIo<TcpStream>>> {
1430 fn connected(&self) -> Connected {
1431 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1432 self.inner
1433 .inner()
1434 .get_ref()
1435 .0
1436 .inner()
1437 .connected()
1438 .negotiated_h2()
1439 } else {
1440 self.inner.inner().get_ref().0.inner().connected()
1441 }
1442 }
1443 }
1444 impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>> {
1445 fn connected(&self) -> Connected {
1446 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1447 self.inner
1448 .inner()
1449 .get_ref()
1450 .0
1451 .inner()
1452 .connected()
1453 .negotiated_h2()
1454 } else {
1455 self.inner.inner().get_ref().0.inner().connected()
1456 }
1457 }
1458 }
1459
1460 #[cfg(unix)]
1461 impl Connection for RustlsTlsConn<TokioIo<TokioIo<tokio::net::UnixStream>>> {
1462 fn connected(&self) -> Connected {
1463 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1464 self.inner
1465 .inner()
1466 .get_ref()
1467 .0
1468 .inner()
1469 .connected()
1470 .negotiated_h2()
1471 } else {
1472 self.inner.inner().get_ref().0.inner().connected()
1473 }
1474 }
1475 }
1476
1477 #[cfg(unix)]
1478 impl Connection for RustlsTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::UnixStream>>>> {
1479 fn connected(&self) -> Connected {
1480 if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") {
1481 self.inner
1482 .inner()
1483 .get_ref()
1484 .0
1485 .inner()
1486 .connected()
1487 .negotiated_h2()
1488 } else {
1489 self.inner.inner().get_ref().0.inner().connected()
1490 }
1491 }
1492 }
1493
1494 impl<T: AsyncRead + AsyncWrite + Unpin> Read for RustlsTlsConn<T> {
1495 fn poll_read(
1496 self: Pin<&mut Self>,
1497 cx: &mut Context,
1498 buf: ReadBufCursor<'_>,
1499 ) -> Poll<tokio::io::Result<()>> {
1500 let this = self.project();
1501 Read::poll_read(this.inner, cx, buf)
1502 }
1503 }
1504
1505 impl<T: AsyncRead + AsyncWrite + Unpin> Write for RustlsTlsConn<T> {
1506 fn poll_write(
1507 self: Pin<&mut Self>,
1508 cx: &mut Context,
1509 buf: &[u8],
1510 ) -> Poll<Result<usize, tokio::io::Error>> {
1511 let this = self.project();
1512 Write::poll_write(this.inner, cx, buf)
1513 }
1514
1515 fn poll_write_vectored(
1516 self: Pin<&mut Self>,
1517 cx: &mut Context<'_>,
1518 bufs: &[IoSlice<'_>],
1519 ) -> Poll<Result<usize, io::Error>> {
1520 let this = self.project();
1521 Write::poll_write_vectored(this.inner, cx, bufs)
1522 }
1523
1524 fn is_write_vectored(&self) -> bool {
1525 self.inner.is_write_vectored()
1526 }
1527
1528 fn poll_flush(
1529 self: Pin<&mut Self>,
1530 cx: &mut Context,
1531 ) -> Poll<Result<(), tokio::io::Error>> {
1532 let this = self.project();
1533 Write::poll_flush(this.inner, cx)
1534 }
1535
1536 fn poll_shutdown(
1537 self: Pin<&mut Self>,
1538 cx: &mut Context,
1539 ) -> Poll<Result<(), tokio::io::Error>> {
1540 let this = self.project();
1541 Write::poll_shutdown(this.inner, cx)
1542 }
1543 }
1544 impl<T> TlsInfoFactory for RustlsTlsConn<T>
1545 where
1546 TokioIo<TlsStream<T>>: TlsInfoFactory,
1547 {
1548 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1549 self.inner.tls_info()
1550 }
1551 }
1552}
1553
1554#[cfg(feature = "socks")]
1555mod socks {
1556 use tower_service::Service;
1557
1558 use http::uri::Scheme;
1559 use http::Uri;
1560 use hyper_util::client::legacy::connect::proxy::{SocksV4, SocksV5};
1561 use tokio::net::TcpStream;
1562
1563 use super::BoxError;
1564 use crate::proxy::Intercepted;
1565
1566 pub(super) enum DnsResolve {
1567 Local,
1568 Proxy,
1569 }
1570
1571 #[derive(Debug)]
1572 pub(super) enum SocksProxyError {
1573 SocksNoHostInUrl,
1574 SocksLocalResolve(BoxError),
1575 SocksConnect(BoxError),
1576 }
1577
1578 pub(super) async fn connect(
1579 proxy: Intercepted,
1580 dst: Uri,
1581 dns_mode: DnsResolve,
1582 resolver: &crate::dns::DynResolver,
1583 http_connector: &mut crate::connect::HttpConnector,
1584 ) -> Result<TcpStream, SocksProxyError> {
1585 let https = dst.scheme() == Some(&Scheme::HTTPS);
1586 let original_host = dst.host().ok_or(SocksProxyError::SocksNoHostInUrl)?;
1587 let mut host = original_host.to_owned();
1588 let port = match dst.port() {
1589 Some(p) => p.as_u16(),
1590 None if https => 443u16,
1591 _ => 80u16,
1592 };
1593
1594 if let DnsResolve::Local = dns_mode {
1595 let maybe_new_target = resolver
1596 .http_resolve(&dst)
1597 .await
1598 .map_err(SocksProxyError::SocksLocalResolve)?
1599 .next();
1600 if let Some(new_target) = maybe_new_target {
1601 log::trace!("socks local dns resolved {new_target:?}");
1602 let ip = new_target.ip();
1604 if ip.is_ipv6() {
1605 host = format!("[{}]", ip);
1606 } else {
1607 host = ip.to_string();
1608 }
1609 }
1610 }
1611
1612 let proxy_uri = proxy.uri().clone();
1613 let dst_uri = format!(
1615 "{}://{}:{}",
1616 if https { "https" } else { "http" },
1617 host,
1618 port
1619 )
1620 .parse::<Uri>()
1621 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1622
1623 match proxy.uri().scheme_str() {
1625 Some("socks4") | Some("socks4a") => {
1626 let mut svc = SocksV4::new(proxy_uri, http_connector);
1627 let stream = Service::call(&mut svc, dst_uri)
1628 .await
1629 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1630 Ok(stream.into_inner())
1631 }
1632 Some("socks5") | Some("socks5h") => {
1633 let mut svc = if let Some((username, password)) = proxy.raw_auth() {
1634 SocksV5::new(proxy_uri, http_connector)
1635 .with_auth(username.to_string(), password.to_string())
1636 } else {
1637 SocksV5::new(proxy_uri, http_connector)
1638 };
1639 let stream = Service::call(&mut svc, dst_uri)
1640 .await
1641 .map_err(|e| SocksProxyError::SocksConnect(e.into()))?;
1642 Ok(stream.into_inner())
1643 }
1644 _ => unreachable!(),
1645 }
1646 }
1647
1648 impl std::fmt::Display for SocksProxyError {
1649 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1650 match self {
1651 Self::SocksNoHostInUrl => f.write_str("socks proxy destination has no host"),
1652 Self::SocksLocalResolve(_) => f.write_str("error resolving for socks proxy"),
1653 Self::SocksConnect(_) => f.write_str("error connecting to socks proxy"),
1654 }
1655 }
1656 }
1657
1658 impl std::error::Error for SocksProxyError {
1659 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1660 match self {
1661 Self::SocksNoHostInUrl => None,
1662 Self::SocksLocalResolve(ref e) => Some(&**e),
1663 Self::SocksConnect(ref e) => Some(&**e),
1664 }
1665 }
1666 }
1667}
1668
1669mod verbose {
1670 use crate::util::Escape;
1671 use hyper::rt::{Read, ReadBufCursor, Write};
1672 use hyper_util::client::legacy::connect::{Connected, Connection};
1673 use std::cmp::min;
1674 use std::fmt;
1675 use std::io::{self, IoSlice};
1676 use std::pin::Pin;
1677 use std::task::{Context, Poll};
1678
1679 pub(super) const OFF: Wrapper = Wrapper(false);
1680
1681 #[derive(Clone, Copy)]
1682 pub(super) struct Wrapper(pub(super) bool);
1683
1684 impl Wrapper {
1685 pub(super) fn wrap<T: super::AsyncConnWithInfo>(&self, conn: T) -> super::BoxConn {
1686 if self.0 && log::log_enabled!(log::Level::Trace) {
1687 Box::new(Verbose {
1688 id: crate::util::fast_random() as u32,
1690 inner: conn,
1691 })
1692 } else {
1693 Box::new(conn)
1694 }
1695 }
1696 }
1697
1698 struct Verbose<T> {
1699 id: u32,
1700 inner: T,
1701 }
1702
1703 impl<T: Connection + Read + Write + Unpin> Connection for Verbose<T> {
1704 fn connected(&self) -> Connected {
1705 self.inner.connected()
1706 }
1707 }
1708
1709 impl<T: Read + Write + Unpin> Read for Verbose<T> {
1710 fn poll_read(
1711 mut self: Pin<&mut Self>,
1712 cx: &mut Context,
1713 mut buf: ReadBufCursor<'_>,
1714 ) -> Poll<std::io::Result<()>> {
1715 let mut vbuf = hyper::rt::ReadBuf::uninit(unsafe { buf.as_mut() });
1719 match Pin::new(&mut self.inner).poll_read(cx, vbuf.unfilled()) {
1720 Poll::Ready(Ok(())) => {
1721 log::trace!("{:08x} read: {:?}", self.id, Escape::new(vbuf.filled()));
1722 let len = vbuf.filled().len();
1723 unsafe {
1726 buf.advance(len);
1727 }
1728 Poll::Ready(Ok(()))
1729 }
1730 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1731 Poll::Pending => Poll::Pending,
1732 }
1733 }
1734 }
1735
1736 impl<T: Read + Write + Unpin> Write for Verbose<T> {
1737 fn poll_write(
1738 mut self: Pin<&mut Self>,
1739 cx: &mut Context,
1740 buf: &[u8],
1741 ) -> Poll<Result<usize, std::io::Error>> {
1742 match Pin::new(&mut self.inner).poll_write(cx, buf) {
1743 Poll::Ready(Ok(n)) => {
1744 log::trace!("{:08x} write: {:?}", self.id, Escape::new(&buf[..n]));
1745 Poll::Ready(Ok(n))
1746 }
1747 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1748 Poll::Pending => Poll::Pending,
1749 }
1750 }
1751
1752 fn poll_write_vectored(
1753 mut self: Pin<&mut Self>,
1754 cx: &mut Context<'_>,
1755 bufs: &[IoSlice<'_>],
1756 ) -> Poll<Result<usize, io::Error>> {
1757 match Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) {
1758 Poll::Ready(Ok(nwritten)) => {
1759 log::trace!(
1760 "{:08x} write (vectored): {:?}",
1761 self.id,
1762 Vectored { bufs, nwritten }
1763 );
1764 Poll::Ready(Ok(nwritten))
1765 }
1766 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1767 Poll::Pending => Poll::Pending,
1768 }
1769 }
1770
1771 fn is_write_vectored(&self) -> bool {
1772 self.inner.is_write_vectored()
1773 }
1774
1775 fn poll_flush(
1776 mut self: Pin<&mut Self>,
1777 cx: &mut Context,
1778 ) -> Poll<Result<(), std::io::Error>> {
1779 Pin::new(&mut self.inner).poll_flush(cx)
1780 }
1781
1782 fn poll_shutdown(
1783 mut self: Pin<&mut Self>,
1784 cx: &mut Context,
1785 ) -> Poll<Result<(), std::io::Error>> {
1786 Pin::new(&mut self.inner).poll_shutdown(cx)
1787 }
1788 }
1789
1790 #[cfg(feature = "__tls")]
1791 impl<T: super::TlsInfoFactory> super::TlsInfoFactory for Verbose<T> {
1792 fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
1793 self.inner.tls_info()
1794 }
1795 }
1796
1797 struct Vectored<'a, 'b> {
1798 bufs: &'a [IoSlice<'b>],
1799 nwritten: usize,
1800 }
1801
1802 impl fmt::Debug for Vectored<'_, '_> {
1803 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1804 let mut left = self.nwritten;
1805 for buf in self.bufs.iter() {
1806 if left == 0 {
1807 break;
1808 }
1809 let n = min(left, buf.len());
1810 Escape::new(&buf[..n]).fmt(f)?;
1811 left -= n;
1812 }
1813 Ok(())
1814 }
1815 }
1816}