finish functionality for uploader to receive waker handle messages

main
xenofem 2022-04-27 01:17:00 -04:00
parent ba3326ef24
commit 16913bb079
1 changed files with 19 additions and 3 deletions

View File

@ -1,6 +1,6 @@
use std::{collections::HashSet, fmt::Display, io::Write};
use std::{collections::HashSet, fmt::Display, io::Write, task::Waker};
use actix::{Actor, ActorContext, AsyncContext, StreamHandler};
use actix::{Actor, ActorContext, AsyncContext, StreamHandler, Message, Handler};
use actix_http::ws::{CloseReason, Item};
use actix_web_actors::ws::{self, CloseCode};
use log::{debug, error, info, trace};
@ -53,6 +53,7 @@ impl Error {
pub struct Uploader {
writer: Option<Box<dyn LiveWriter>>,
storage_filename: String,
app_data: super::AppData,
bytes_remaining: usize,
}
@ -61,6 +62,7 @@ impl Uploader {
pub fn new(app_data: super::AppData) -> Self {
Self {
writer: None,
storage_filename: String::new(),
app_data,
bytes_remaining: 0,
}
@ -71,6 +73,17 @@ impl Actor for Uploader {
type Context = ws::WebsocketContext<Self>;
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) struct WakerMessage(pub Waker);
impl Handler<WakerMessage> for Uploader {
type Result = ();
fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) {
self.writer.as_mut().map(|w| w.add_waker(msg.0));
}
}
#[derive(Debug, Deserialize)]
struct RawUploadedFile {
name: String,
@ -116,7 +129,9 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
code: CloseCode::Normal,
description: None,
}));
// self.app_data.write().unwrap().entry(
if let Some(f) = self.app_data.write().unwrap().get_mut(&self.storage_filename) {
f.uploader.take();
}
ctx.stop();
}
_ => (),
@ -162,6 +177,7 @@ impl Uploader {
return Err(Error::NoFiles);
}
let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
self.storage_filename = storage_filename.clone();
let storage_path = super::storage_dir().join(storage_filename.clone());
info!("storing to: {:?}", storage_path);
let writer = super::file::LiveFileWriter::new(&storage_path)?;