diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 5d56faf..49011df 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly" +channel = "nightly-2025-03-18" \ No newline at end of file diff --git a/src/apps/chat.html b/src/apps/chat.html index 9afc45c..8b3b616 100644 --- a/src/apps/chat.html +++ b/src/apps/chat.html @@ -1,11 +1,15 @@ - - - + + +

Chat

+
+ + +
diff --git a/src/apps/chat.js b/src/apps/chat.js index 2ead47f..a52a188 100644 --- a/src/apps/chat.js +++ b/src/apps/chat.js @@ -1,11 +1,7 @@ // const id = 0; -// const username = "testName"; if (id === undefined) { throw "id is undefined!"; } -// if (username === undefined) { -// throw "username is undefined!"; -// } const ws = new WebSocket("ws://192.254.0.2:" + (9000 + id)); ws.onmessage = (event) => { @@ -13,15 +9,37 @@ ws.onmessage = (event) => { if (typeof event.data == "string") { let msg = JSON.parse(event.data); if (event.data[0] == "[") { - let usernames = []; - for (u of msg) { - if (u.length() > 0) { + // Handle list of users + users = msg; + let usersElems = []; + for (u of users) { + if (typeof u == "string") { let un = document.createElement("div"); un.innerText = u; - usernames.push(un); + usersElems.push(un); } } - document.getElementById("usernames").replaceChildren(...usernames); + document.getElementById("users").replaceChildren(...usersElems); + } else { + // Handle message + let msgs = document.getElementById("msgs"); + let elem = document.createElement("p"); + elem["data-id"] = msg.id; + elem.innerHTML = + "" + users[msg.author] + " : " + msg.content; + for (c of msgs.children) { + if (c["data-id"] > msg.id) { + msgs.insertBefore(elem, c); + return; + } + } + msgs.appendChild(elem); } } }; + +document.getElementById("send").onsubmit = (event) => { + event.preventDefault(); + // console.log(event, document.getElementById("sendcontent").value); + ws.send("send " + document.getElementById("sendcontent").value); +}; diff --git a/src/apps/chat.rs b/src/apps/chat.rs index 41c7f7d..d5f6590 100644 --- a/src/apps/chat.rs +++ b/src/apps/chat.rs @@ -1,10 +1,11 @@ -use core::str::from_utf8_unchecked; +use core::{str::from_utf8_unchecked, sync::atomic::Ordering}; -use embassy_sync::{blocking_mutex::raw::ThreadModeRawMutex, mutex::Mutex}; -use embassy_time::Timer; +use embassy_sync::{blocking_mutex::raw::ThreadModeRawMutex, mutex::Mutex, signal::Signal}; +use embassy_time::{Duration, Timer}; use heapless::String; use log::{info, warn}; use pico_website::{unimplemented, unwrap, unwrap_opt}; +use portable_atomic::AtomicUsize; use serde::Serialize; use crate::{apps::App, socket::ws::WsMsg}; @@ -50,6 +51,7 @@ impl Msgs { self.inner[self.head..self.head + 2].copy_from_slice(&(content.len() as u16).to_le_bytes()); self.inner[self.head + 2] = author + 1; self.head += 3; + self.next_msg += 1; } // Iter messages from present to past fn iter(&self) -> MsgsIter { @@ -70,6 +72,8 @@ impl Msgs { } } } + +#[derive(Debug)] struct MsgsIter<'a> { msgs: &'a Msgs, /// next byte index @@ -111,6 +115,7 @@ impl<'a> Iterator for MsgsIter<'a> { self.finished = true; return None; } + let id = self.current_id; if self.current_id == 0 { self.finished = true; } else { @@ -118,7 +123,7 @@ impl<'a> Iterator for MsgsIter<'a> { } Some(Msg { - id: self.current_id, + id, author, content, }) @@ -145,6 +150,7 @@ impl Usernames { if *un == None { *un = Some(String::new()); un.as_mut().unwrap().push_str(name).unwrap(); + USERNAMES_VERSION.add(1, Ordering::Relaxed); return Some(i as u8); } } @@ -152,16 +158,22 @@ impl Usernames { } } pub static USERNAMES: Mutex = Mutex::new(Usernames::default()); +pub static USERNAMES_VERSION: AtomicUsize = AtomicUsize::new(0); pub struct ChatApp { id: u8, json_buf: [u8; 48 + USERNAME_MAX_LEN + MSG_MAX_SIZE], + /// Id of the next message to send to client (so that 0 means no message has been sent) + next_msg: usize, + usernames_version: usize, } impl ChatApp { pub fn new(id: u8) -> Self { Self { id, json_buf: [0; _], + next_msg: 0, + usernames_version: 0, } } } @@ -170,43 +182,35 @@ impl App for ChatApp { "chat" } fn accept_ws(&self, path: &str) -> bool { - path.len() > 1 && path.len() <= 17 + path == "/" + // path.len() > 1 && path.len() <= 17 } - async fn handle_ws<'a, const BUF_SIZE: usize, const RES_HEAD_BUF_SIZE: usize>( + async fn handle_ws<'a, const BUF_SIZE: usize>( &'a mut self, _path: &str, - mut ws: crate::socket::ws::Ws<'a, BUF_SIZE, RES_HEAD_BUF_SIZE>, + mut ws: crate::socket::ws::Ws<'a, BUF_SIZE>, ) { Timer::after_millis(500).await; let r: Result<(), ()> = try { - // Send all messages at the beginning - { - let msgs = MSGS.lock().await; - for m in msgs.iter() { - let n = unwrap(serde_json_core::to_slice(&m, &mut self.json_buf)).await; - let json = - unsafe { from_utf8_unchecked(&unwrap_opt(self.json_buf.get(..n)).await) }; - ws.send(WsMsg::Text(json)).await?; - } - } - { - let usernames = USERNAMES.lock().await; - let n = unwrap(serde_json_core::to_slice(&(*usernames), &mut self.json_buf)).await; - let json = - unsafe { from_utf8_unchecked(&unwrap_opt(self.json_buf.get(..n)).await) }; - ws.send(WsMsg::Text(json)).await?; - } - loop { - Timer::after_secs(1).await; + Timer::after_millis(1).await; + { + let uv = USERNAMES_VERSION.load(Ordering::Relaxed); + if self.usernames_version < uv { + ws.send_json(&(*USERNAMES.lock().await)).await?; + } + } { let msgs = MSGS.lock().await; - info!("{:?}", msgs); - Timer::after_millis(100).await; for m in msgs.iter() { - info!("{:?}", m); - Timer::after_millis(100).await; + if m.id >= self.next_msg { + ws.send_json(&m).await?; + } } + self.next_msg = msgs.next_msg; + } + if ws.last_msg.elapsed() >= Duration::from_secs(5) { + ws.send(WsMsg::Ping(&[])).await?; } while let Some(r) = ws.rcv().await? { info!("{:?}", r); @@ -216,23 +220,12 @@ impl App for ChatApp { warn!("Message too long! (len={})", r.len() - 5); return; } - MSGS.lock() - .await - .push(self.id, r.get(5..).unwrap_or_default()); + { + MSGS.lock() + .await + .push(self.id, r.get(5..).unwrap_or_default()); + } } - // if r.starts_with("reqmsg ") { - // let Ok(msg_id) = r.get(7..).unwrap_or_default().parse::() else { - // warn!("Invalid requested message : {}", r); - // return; - // }; - // // 0 is next msg - // let msg_rel_id = if msg_id >= 0 { - - // } - // if msg_id < 0 { - - // } - // } } } } diff --git a/src/apps/index.rs b/src/apps/index.rs index db0c164..029f6ef 100644 --- a/src/apps/index.rs +++ b/src/apps/index.rs @@ -50,9 +50,11 @@ impl App for IndexApp { name = Some(n); } } - Some(("id", id)) => { - if let Ok(id) = id.parse::() { - if id < apps::chat::USERS_LEN {} + Some(("id", i)) => { + if let Ok(i) = i.parse::() { + if i < apps::chat::USERS_LEN { + id = Some(i); + } } } _ => {} @@ -130,7 +132,7 @@ impl App for IndexApp { let mut content = Vec::new(); let r: Result<(), &str> = try { content.push("const id = ")?; - content.push(id)?; + content.push(id_to_static_str(id).await)?; content.push(";")?; // #[cfg(debug_assertions)] content.push(include_str!("chat.js"))?; diff --git a/src/apps/mod.rs b/src/apps/mod.rs index 98e1e0d..c65d724 100644 --- a/src/apps/mod.rs +++ b/src/apps/mod.rs @@ -21,10 +21,10 @@ pub trait App { fn accept_ws(&self, _path: &str) -> bool { false } - async fn handle_ws<'a, const BUF_SIZE: usize, const RES_HEAD_BUF_SIZE: usize>( + async fn handle_ws<'a, const BUF_SIZE: usize>( &'a mut self, _path: &str, - _ws: Ws<'a, BUF_SIZE, RES_HEAD_BUF_SIZE>, + _ws: Ws<'a, BUF_SIZE>, ) { } } diff --git a/src/apps/ttt.html b/src/apps/ttt.html index cfcdf1d..d86a69a 100644 --- a/src/apps/ttt.html +++ b/src/apps/ttt.html @@ -1,28 +1,28 @@ - - - - + + + +

TicTacToe

diff --git a/src/apps/ttt.rs b/src/apps/ttt.rs index 0471802..ee15171 100644 --- a/src/apps/ttt.rs +++ b/src/apps/ttt.rs @@ -91,10 +91,10 @@ impl App for TttApp { fn accept_ws(&self, path: &str) -> bool { (self.team == Team::Zero && path == "/blue") || (self.team == Team::One && path == "/red") } - async fn handle_ws<'a, const BUF_SIZE: usize, const RES_HEAD_BUF_SIZE: usize>( + async fn handle_ws<'a, const BUF_SIZE: usize>( &'a mut self, _path: &str, - mut ws: Ws<'a, BUF_SIZE, RES_HEAD_BUF_SIZE>, + mut ws: Ws<'a, BUF_SIZE>, ) { Timer::after_millis(500).await; let r: Result<(), ()> = try { diff --git a/src/main.rs b/src/main.rs index b0bfeba..8a5b451 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,8 @@ #![feature(try_blocks)] #![feature(impl_trait_in_bindings)] #![feature(array_repeat)] +#![feature(generic_arg_infer)] +#![feature(async_iterator)] #[cfg(feature = "wifi-connect")] use core::net::Ipv4Addr; diff --git a/src/socket.rs b/src/socket.rs index f12cd7c..0dbdfa4 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -227,11 +227,8 @@ pub async fn listen_task(stack: embassy_net::Stack<'static>, mut app: impl apps: } } if let Some(path) = ws_path { - app.handle_ws( - &path, - Ws::new(&mut socket, &mut buf, &mut head_buf, app.socket_name()), - ) - .await; + app.handle_ws(&path, Ws::new(&mut socket, &mut buf, app.socket_name())) + .await; } } } diff --git a/src/socket/ws.rs b/src/socket/ws.rs index 71f2c5c..207347e 100644 --- a/src/socket/ws.rs +++ b/src/socket/ws.rs @@ -1,4 +1,4 @@ -use core::str::from_utf8; +use core::str::{from_utf8, from_utf8_unchecked}; use embassy_net::tcp::{TcpReader, TcpSocket, TcpWriter}; use embassy_time::{Instant, Timer}; @@ -16,6 +16,10 @@ pub enum WsMsg<'a> { Unknown(u8, &'a [u8]), } impl WsMsg<'_> { + const TEXT: u8 = 1; + const BYTES: u8 = 2; + const PING: u8 = 9; + const PONG: u8 = 10; pub const fn len(&self) -> usize { self.as_bytes().len() } @@ -41,47 +45,73 @@ struct WsRx<'a, const BUF_SIZE: usize> { buf: &'a mut [u8; BUF_SIZE], msg_in_buf: Option<(usize, usize)>, // (start, length) } -struct WsTx<'a, const HEAD_BUF_SIZE: usize> { +struct WsTx<'a> { socket: TcpWriter<'a>, - head_buf: &'a mut Vec, } -impl<'a, const HEAD_BUF_SIZE: usize> WsTx<'a, HEAD_BUF_SIZE> { - pub async fn send<'m>(&mut self, msg: WsMsg<'m>) -> Result<(), ()> { - self.head_buf.clear(); - unwrap(self.head_buf.push(0b1000_0000 | msg.code())).await; - if msg.len() < 126 { - unwrap(self.head_buf.push(msg.len() as u8)).await; +impl<'a> WsTx<'a> { + pub async fn send_with Result>( + &mut self, + msg_code: u8, + f: F, + ) -> Result<(), ()> { + if self.send_with_no_flush(msg_code, &f).await.is_err() { + self.socket.flush().await.map_err(|_| ())?; + self.send_with_no_flush(msg_code, f).await } else { - unwrap(self.head_buf.push(0b0111_1110)).await; - unwrap( - self.head_buf - .extend_from_slice(&(msg.len() as u16).to_le_bytes()), - ) - .await; + Ok(()) } - let w: Result<(), as ErrorType>::Error> = try { - self.socket.write_all(&self.head_buf).await?; - self.socket.write_all(msg.as_bytes()).await?; - }; - w.map_err(|e| { - warn!("write error: {:?}", e); + } + pub async fn send_with_no_flush Result>( + &mut self, + msg_code: u8, + f: F, + ) -> Result<(), ()> { + self.socket + .write_with(|buf| { + buf[0] = 0b1000_0000 | msg_code; + let Ok(n) = f(&mut buf[4..]) else { + return (0, Err(())); + }; + if n < 126 { + buf[1] = n as u8; + buf.copy_within(4..4 + n, 2); + (n + 2, Ok(())) + } else { + buf[1..=2].copy_from_slice(&(n as u16).to_le_bytes()); + (n + 4, Ok(())) + } + }) + .await + .map_err(|_| ())? + } + pub async fn send<'m>(&mut self, msg: WsMsg<'m>) -> Result<(), ()> { + self.send_with(msg.code(), |buf| { + let msg = msg.as_bytes(); + if buf.len() < msg.len() { + Err(()) + } else { + buf[..msg.len()].copy_from_slice(msg); + Ok(msg.len()) + } }) + .await + } + pub async fn send_json(&mut self, msg: &T) -> Result<(), ()> { + self.send_with(WsMsg::TEXT, |buf| { + serde_json_core::to_slice(msg, buf).map_err(|_| ()) + }) + .await } } -pub struct Ws<'a, const BUF_SIZE: usize = 1024, const RES_HEAD_BUF_SIZE: usize = 256> { +pub struct Ws<'a, const BUF_SIZE: usize = 1024> { rx: WsRx<'a, BUF_SIZE>, - tx: WsTx<'a, RES_HEAD_BUF_SIZE>, + tx: WsTx<'a>, pub last_msg: Instant, name: &'a str, } -impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEAD_BUF_SIZE> { - pub fn new( - socket: &'a mut TcpSocket, - buf: &'a mut [u8; BUF_SIZE], - head_buf: &'a mut Vec, - name: &'a str, - ) -> Self { +impl<'a, const BUF_SIZE: usize> Ws<'a, BUF_SIZE> { + pub fn new(socket: &'a mut TcpSocket, buf: &'a mut [u8; BUF_SIZE], name: &'a str) -> Self { let (rx, tx) = socket.split(); Self { rx: WsRx { @@ -89,10 +119,7 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA buf, msg_in_buf: None, }, - tx: WsTx { - socket: tx, - head_buf, - }, + tx: WsTx { socket: tx }, last_msg: Instant::MIN, name, } @@ -228,4 +255,27 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA self.last_msg = Instant::now(); Ok(()) } + pub async fn send_json(&mut self, msg: &T) -> Result<(), ()> { + self.tx.send_json(msg).await?; + self.last_msg = Instant::now(); + Ok(()) + } + pub async fn send_with Result>( + &mut self, + msg_code: u8, + f: F, + ) -> Result<(), ()> { + self.tx.send_with(msg_code, f).await?; + self.last_msg = Instant::now(); + Ok(()) + } + pub async fn send_with_no_flush Result>( + &mut self, + msg_code: u8, + f: F, + ) -> Result<(), ()> { + self.tx.send_with_no_flush(msg_code, f).await?; + self.last_msg = Instant::now(); + Ok(()) + } }