From 16913bb079e706e3ee2c3dc10bbca48256b14bab Mon Sep 17 00:00:00 2001 From: xenofem Date: Wed, 27 Apr 2022 01:17:00 -0400 Subject: [PATCH] finish functionality for uploader to receive waker handle messages --- src/upload.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/upload.rs b/src/upload.rs index ae08278..1066d76 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,6 +1,6 @@ -use std::{collections::HashSet, fmt::Display, io::Write}; +use std::{collections::HashSet, fmt::Display, io::Write, task::Waker}; -use actix::{Actor, ActorContext, AsyncContext, StreamHandler}; +use actix::{Actor, ActorContext, AsyncContext, StreamHandler, Message, Handler}; use actix_http::ws::{CloseReason, Item}; use actix_web_actors::ws::{self, CloseCode}; use log::{debug, error, info, trace}; @@ -53,6 +53,7 @@ impl Error { pub struct Uploader { writer: Option>, + storage_filename: String, app_data: super::AppData, bytes_remaining: usize, } @@ -61,6 +62,7 @@ impl Uploader { pub fn new(app_data: super::AppData) -> Self { Self { writer: None, + storage_filename: String::new(), app_data, bytes_remaining: 0, } @@ -71,6 +73,17 @@ impl Actor for Uploader { type Context = ws::WebsocketContext; } +#[derive(Message)] +#[rtype(result = "()")] +pub(crate) struct WakerMessage(pub Waker); + +impl Handler for Uploader { + type Result = (); + fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) { + self.writer.as_mut().map(|w| w.add_waker(msg.0)); + } +} + #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, @@ -116,7 +129,9 @@ impl StreamHandler> for Uploader { code: CloseCode::Normal, description: None, })); - // self.app_data.write().unwrap().entry( + if let Some(f) = self.app_data.write().unwrap().get_mut(&self.storage_filename) { + f.uploader.take(); + } ctx.stop(); } _ => (), @@ -162,6 +177,7 @@ impl Uploader { return Err(Error::NoFiles); } let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8); + self.storage_filename = storage_filename.clone(); let storage_path = super::storage_dir().join(storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = super::file::LiveFileWriter::new(&storage_path)?;