backend/
main.rs

1//! The backend of `PermaplanT`.
2
3#![recursion_limit = "1024"]
4// Enable all lints apart from clippy::restriction by default.
5// See https://rust-lang.github.io/rust-clippy/master/index.html#blanket_clippy_restriction_lints for as to why restriction is not enabled.
6#![warn(clippy::pedantic)]
7#![warn(clippy::nursery)]
8#![warn(clippy::cargo)]
9// Lints in clippy::restriction which seem useful.
10#![warn(
11    clippy::clone_on_ref_ptr,
12    clippy::empty_structs_with_brackets,
13    clippy::exit,
14    clippy::expect_used,
15    clippy::format_push_string,
16    clippy::get_unwrap,
17    clippy::if_then_some_else_none,
18    clippy::indexing_slicing,
19    clippy::integer_division,
20    clippy::implicit_clone,
21    clippy::large_include_file,
22    clippy::missing_docs_in_private_items,
23    clippy::mixed_read_write_in_expression,
24    clippy::multiple_inherent_impl,
25    clippy::mutex_atomic,
26    clippy::panic_in_result_fn,
27    clippy::partial_pub_fields,
28    clippy::print_stderr,
29    clippy::print_stdout,
30    clippy::rc_buffer,
31    clippy::rc_mutex,
32    clippy::rest_pat_in_fully_bound_structs,
33    clippy::same_name_method,
34    clippy::shadow_unrelated,
35    clippy::str_to_string,
36    clippy::suspicious_xor_used_as_pow,
37    clippy::todo,
38    clippy::try_err,
39    clippy::unimplemented,
40    clippy::unnecessary_self_imports,
41    clippy::unneeded_field_pattern,
42    clippy::unreachable,
43    clippy::unseparated_literal_suffix,
44    clippy::unwrap_in_result,
45    clippy::unwrap_used,
46    clippy::use_debug,
47    clippy::verbose_file_reads
48)]
49// Cannot fix some errors because dependencies import them.
50#![allow(clippy::multiple_crate_versions)]
51// Clippy suggests lots of false "x.get(0)" => "x.first()"
52#![allow(clippy::get_first)]
53// We often want the same name per module (for instance every enum).
54#![allow(clippy::module_name_repetitions)]
55
56use actix_cors::Cors;
57use actix_web::{http, middleware::Logger, App, HttpServer};
58use config::{api_doc, auth::Config, routes};
59use db::{
60    connection::Pool,
61    cronjobs::{cleanup_layers, cleanup_maps},
62};
63use std::sync::Arc;
64
65use opentelemetry::propagation::TextMapCompositePropagator;
66use opentelemetry::{global, KeyValue};
67use opentelemetry_instrumentation_actix_web::{RequestMetrics, RequestTracing};
68use opentelemetry_resource_detectors::{
69    HostResourceDetector, OsResourceDetector, ProcessResourceDetector,
70};
71use opentelemetry_sdk::metrics::SdkMeterProvider;
72use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
73use opentelemetry_sdk::trace::SdkTracerProvider;
74use opentelemetry_sdk::Resource;
75
76pub mod config;
77pub mod controller;
78pub mod db;
79pub mod error;
80pub mod keycloak_api;
81pub mod model;
82/// Auto generated by diesel.
83#[allow(clippy::wildcard_imports)]
84#[allow(clippy::missing_docs_in_private_items)]
85pub mod schema;
86pub mod service;
87pub mod sse;
88#[cfg(test)]
89pub mod test;
90
91/// OpenTelemetry resource describing this service.
92static RESOURCE: std::sync::LazyLock<Resource> = std::sync::LazyLock::new(|| {
93    const NAME: &str = env!("CARGO_PKG_NAME");
94    const VERSION: &str = env!("CARGO_PKG_VERSION");
95    const BUILD_DATE: &str = env!("BUILD_DATE");
96    Resource::builder()
97        .with_detector(Box::new(OsResourceDetector))
98        .with_detector(Box::new(HostResourceDetector::default()))
99        .with_detector(Box::new(ProcessResourceDetector))
100        .with_attributes(vec![
101            KeyValue::new("service.name", NAME),
102            KeyValue::new("service.version", VERSION),
103            KeyValue::new("service.build_date", BUILD_DATE),
104        ])
105        .build()
106});
107
108/// Main function.
109#[actix_web::main]
110async fn main() -> std::io::Result<()> {
111    env_logger::init();
112
113    log::debug!("Initializing telemetry ...");
114
115    // Initialize OpenTelemetry
116    let tracer_provider = init_trace();
117    let tracer_provider = match tracer_provider {
118        Ok(tp) => tp,
119        Err(e) => {
120            log::error!("Error initializing OpenTelemetry tracing: {e}");
121            std::process::exit(1);
122        }
123    };
124
125    global::set_tracer_provider(tracer_provider.clone());
126
127    let meter_provider = init_metrics();
128    let meter_provider = match meter_provider {
129        Ok(mp) => mp,
130        Err(e) => {
131            log::error!("Error initializing OpenTelemetry metrics: {e}");
132            std::process::exit(1);
133        }
134    };
135
136    global::set_meter_provider(meter_provider.clone());
137
138    opentelemetry_instrumentation_tokio::observe_current_runtime();
139
140    log::debug!("Starting PermaplanT backend...");
141
142    let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
143    let build_date = option_env!("BUILD_DATE").unwrap_or("unknown");
144
145    let config = match config::app::Config::from_env() {
146        Ok(config) => config,
147        Err(e) => {
148            return Err(std::io::Error::other(format!(
149                "Error reading configuration: {e}"
150            )));
151        }
152    };
153
154    log::info!("Configuration loaded: {config:#?}");
155
156    Config::init(&config).await;
157
158    log::info!("Version   : {version}");
159    log::info!("Build date: {build_date}");
160
161    let data_init = config::data::init(&config);
162    let pool = data_init.pool.clone().into_inner();
163    start_cronjobs(pool);
164
165    log::info!(
166        "Binding to: {}:{}",
167        config.bind_address.0,
168        config.bind_address.1
169    );
170
171    let broadcaster = data_init.broadcaster.clone();
172
173    let server = HttpServer::new(move || {
174        App::new()
175            .wrap(Logger::default())
176            .wrap(RequestTracing::new())
177            .wrap(RequestMetrics::default())
178            .wrap(cors_configuration())
179            .app_data(data_init.pool.clone())
180            .app_data(data_init.broadcaster.clone())
181            .app_data(data_init.http_client.clone())
182            .app_data(data_init.keycloak_api.clone())
183            .app_data(data_init.mode.clone())
184            .configure(routes::config)
185            .configure(api_doc::config)
186    })
187    .disable_signals()
188    .shutdown_timeout(5)
189    .bind(config.bind_address)?
190    .run();
191
192    let srv_handle = server.handle();
193
194    actix_web::rt::spawn(async move {
195        if let Err(err) = shutdown_signal().await {
196            log::error!("Failed to listen for shutdown signals: {err}");
197            return;
198        }
199        log::info!("Received shutdown signal, notifying all connected clients");
200        let action = model::dto::actions::Action::new_shutdown_signal_action(
201            "Server is shutting down".to_owned(),
202        );
203        broadcaster.broadcast_all_maps(action).await;
204        log::info!("Shutdown notification sent to all clients");
205        srv_handle.stop(true).await;
206    });
207
208    server.await?;
209
210    let _ = tracer_provider.shutdown();
211    let _ = meter_provider.shutdown();
212
213    Ok(())
214}
215
216/// Create a CORS configuration for the server.
217fn cors_configuration() -> Cors {
218    let mut cors = Cors::default()
219        // 1. Define allowed origin
220        .allowed_origin("http://localhost:5173")
221        .allowed_origin("https://cloud.permaplant.net")
222        // 2. Add methods and headers
223        .allowed_methods(vec!["GET", "POST", "PUT", "PATCH", "DELETE"])
224        .allowed_headers(vec![
225            http::header::AUTHORIZATION,
226            http::header::ACCEPT,
227            http::header::CONTENT_TYPE,
228        ])
229        .allowed_header("traceparent")
230        .allowed_header("tracestate")
231        .max_age(3600);
232
233    // 3. Programmatically add the repetitive subdomains
234    let subdomains = ["mr", "dev", "master", "www", "experimental", "telemetry"];
235    for sub in subdomains {
236        cors = cors.allowed_origin(&format!("https://{sub}.permaplant.net"));
237        cors = cors.allowed_origin(&format!("https://{sub}.staging.permaplant.net"));
238    }
239
240    cors
241}
242
243/// Start all scheduled jobs that get run in the backend.
244fn start_cronjobs(pool: Arc<Pool>) {
245    tokio::spawn(cleanup_maps(Arc::clone(&pool)));
246    tokio::spawn(cleanup_layers(pool));
247}
248
249/// Initialize OpenTelemetry tracing.
250fn init_trace() -> Result<SdkTracerProvider, Box<dyn std::error::Error + Send + Sync>> {
251    let baggage_propagator = BaggagePropagator::new();
252    let trace_context_propagator = TraceContextPropagator::new();
253    let composite_propagator = TextMapCompositePropagator::new(vec![
254        Box::new(baggage_propagator),
255        Box::new(trace_context_propagator),
256    ]);
257
258    global::set_text_map_propagator(composite_propagator);
259
260    let exporter = opentelemetry_otlp::SpanExporter::builder()
261        .with_http()
262        .build()?;
263
264    let tr = SdkTracerProvider::builder()
265        .with_batch_exporter(exporter)
266        .with_resource(RESOURCE.clone())
267        .build();
268    Ok(tr)
269}
270
271/// Initialize OpenTelemetry metrics.
272fn init_metrics() -> Result<SdkMeterProvider, Box<dyn std::error::Error + Send + Sync>> {
273    let exporter = opentelemetry_otlp::MetricExporter::builder()
274        .with_http()
275        .build()?;
276
277    let mt = SdkMeterProvider::builder()
278        .with_periodic_exporter(exporter)
279        .with_resource(RESOURCE.clone())
280        .build();
281    Ok(mt)
282}
283
284/// Wait for shutdown signal.
285async fn shutdown_signal() -> std::io::Result<()> {
286    let ctrl_c = async {
287        tokio::signal::ctrl_c().await?;
288        Ok::<(), std::io::Error>(())
289    };
290
291    let unix_signal = async {
292        use tokio::signal::unix::{signal, SignalKind};
293
294        let mut sigterm = signal(SignalKind::terminate())?;
295        let mut sighup = signal(SignalKind::hangup())?;
296
297        tokio::select! {
298            _ = sigterm.recv() => {},
299            _ = sighup.recv() => {},
300        }
301
302        Ok::<(), std::io::Error>(())
303    };
304
305    tokio::select! {
306        res = ctrl_c => res?,
307        res = unix_signal => res?,
308    }
309
310    Ok(())
311}