use super::events::{Connect, Disconnect, Event, EventBroker, ID}; use crate::auth::AuthClaims; use actix::*; use actix_web::{ web::{self, Data}, Error, HttpRequest, HttpResponse, }; use actix_web_actors::ws; use jsonwebtoken::{decode, DecodingKey, Validation}; pub struct EventSession { id: ID, } impl Actor for EventSession { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { let addr = ctx.address(); EventBroker::from_registry() .send(Connect { addr: addr.recipient(), }) .into_actor(self) .then(|res, act, ctx| { match res { Ok(res) => act.id = res, // something is wrong. Burn it with fire. _ => ctx.stop(), } fut::ready(()) }) .wait(ctx); } fn stopping(&mut self, _: &mut Self::Context) -> Running { // notify suprivisor we are leavin EventBroker::from_registry().do_send(Disconnect { id: self.id }); Running::Stop } } impl Handler for EventSession { type Result = (); fn handle(&mut self, msg: Event, ctx: &mut Self::Context) -> Self::Result { let data = serde_json::to_string(&msg).unwrap_or_default(); ctx.text(data); } } impl StreamHandler> for EventSession { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => ctx.text(text), Ok(ws::Message::Binary(bin)) => ctx.binary(bin), _ => (), } } } #[tracing::instrument(skip(req, stream, state))] pub async fn event_session_index( req: HttpRequest, stream: web::Payload, state: Data, path: web::Path, ) -> Result { let secret = crate::auth::get_secret(&state.db).await?; let decoded = decode::( &path, &DecodingKey::from_secret(&secret), &Validation::default(), ); match decoded { Ok(_) => ws::start(EventSession { id: 0 }, &req, stream), Err(e) => Err(actix_web::error::ErrorUnauthorized(e.to_string())), } }