things + rework ws

This commit is contained in:
Arkitu 2025-08-16 12:18:15 +02:00
parent 9c8a7af782
commit f5fa4647f4
11 changed files with 195 additions and 129 deletions

View File

@ -1,2 +1,2 @@
[toolchain]
channel = "nightly"
channel = "nightly-2025-03-18"

View File

@ -1,11 +1,15 @@
<!doctype html>
<html>
<head>
<script src="/chat.js" defer></script>
</head>
<html>
<body>
<h1>Chat</h1>
<div id="users"></div>
<div id="msgs"></div>
<form id="send">
<input id="sendcontent" name="content" autofocus required />
<input type="submit" value="Send" />
</form>
</body>
</html>

View File

@ -1,11 +1,7 @@
// const id = 0;
// const username = "testName";
if (id === undefined) {
throw "id is undefined!";
}
// if (username === undefined) {
// throw "username is undefined!";
// }
const ws = new WebSocket("ws://192.254.0.2:" + (9000 + id));
ws.onmessage = (event) => {
@ -13,15 +9,37 @@ ws.onmessage = (event) => {
if (typeof event.data == "string") {
let msg = JSON.parse(event.data);
if (event.data[0] == "[") {
let usernames = [];
for (u of msg) {
if (u.length() > 0) {
// Handle list of users
users = msg;
let usersElems = [];
for (u of users) {
if (typeof u == "string") {
let un = document.createElement("div");
un.innerText = u;
usernames.push(un);
usersElems.push(un);
}
}
document.getElementById("usernames").replaceChildren(...usernames);
document.getElementById("users").replaceChildren(...usersElems);
} else {
// Handle message
let msgs = document.getElementById("msgs");
let elem = document.createElement("p");
elem["data-id"] = msg.id;
elem.innerHTML =
"<span>" + users[msg.author] + " :</span> " + msg.content;
for (c of msgs.children) {
if (c["data-id"] > msg.id) {
msgs.insertBefore(elem, c);
return;
}
}
msgs.appendChild(elem);
}
}
};
document.getElementById("send").onsubmit = (event) => {
event.preventDefault();
// console.log(event, document.getElementById("sendcontent").value);
ws.send("send " + document.getElementById("sendcontent").value);
};

View File

@ -1,10 +1,11 @@
use core::str::from_utf8_unchecked;
use core::{str::from_utf8_unchecked, sync::atomic::Ordering};
use embassy_sync::{blocking_mutex::raw::ThreadModeRawMutex, mutex::Mutex};
use embassy_time::Timer;
use embassy_sync::{blocking_mutex::raw::ThreadModeRawMutex, mutex::Mutex, signal::Signal};
use embassy_time::{Duration, Timer};
use heapless::String;
use log::{info, warn};
use pico_website::{unimplemented, unwrap, unwrap_opt};
use portable_atomic::AtomicUsize;
use serde::Serialize;
use crate::{apps::App, socket::ws::WsMsg};
@ -50,6 +51,7 @@ impl Msgs {
self.inner[self.head..self.head + 2].copy_from_slice(&(content.len() as u16).to_le_bytes());
self.inner[self.head + 2] = author + 1;
self.head += 3;
self.next_msg += 1;
}
// Iter messages from present to past
fn iter(&self) -> MsgsIter {
@ -70,6 +72,8 @@ impl Msgs {
}
}
}
#[derive(Debug)]
struct MsgsIter<'a> {
msgs: &'a Msgs,
/// next byte index
@ -111,6 +115,7 @@ impl<'a> Iterator for MsgsIter<'a> {
self.finished = true;
return None;
}
let id = self.current_id;
if self.current_id == 0 {
self.finished = true;
} else {
@ -118,7 +123,7 @@ impl<'a> Iterator for MsgsIter<'a> {
}
Some(Msg {
id: self.current_id,
id,
author,
content,
})
@ -145,6 +150,7 @@ impl Usernames {
if *un == None {
*un = Some(String::new());
un.as_mut().unwrap().push_str(name).unwrap();
USERNAMES_VERSION.add(1, Ordering::Relaxed);
return Some(i as u8);
}
}
@ -152,16 +158,22 @@ impl Usernames {
}
}
pub static USERNAMES: Mutex<ThreadModeRawMutex, Usernames> = Mutex::new(Usernames::default());
pub static USERNAMES_VERSION: AtomicUsize = AtomicUsize::new(0);
pub struct ChatApp {
id: u8,
json_buf: [u8; 48 + USERNAME_MAX_LEN + MSG_MAX_SIZE],
/// Id of the next message to send to client (so that 0 means no message has been sent)
next_msg: usize,
usernames_version: usize,
}
impl ChatApp {
pub fn new(id: u8) -> Self {
Self {
id,
json_buf: [0; _],
next_msg: 0,
usernames_version: 0,
}
}
}
@ -170,44 +182,36 @@ impl App for ChatApp {
"chat"
}
fn accept_ws(&self, path: &str) -> bool {
path.len() > 1 && path.len() <= 17
path == "/"
// path.len() > 1 && path.len() <= 17
}
async fn handle_ws<'a, const BUF_SIZE: usize, const RES_HEAD_BUF_SIZE: usize>(
async fn handle_ws<'a, const BUF_SIZE: usize>(
&'a mut self,
_path: &str,
mut ws: crate::socket::ws::Ws<'a, BUF_SIZE, RES_HEAD_BUF_SIZE>,
mut ws: crate::socket::ws::Ws<'a, BUF_SIZE>,
) {
Timer::after_millis(500).await;
let r: Result<(), ()> = try {
// Send all messages at the beginning
{
let msgs = MSGS.lock().await;
for m in msgs.iter() {
let n = unwrap(serde_json_core::to_slice(&m, &mut self.json_buf)).await;
let json =
unsafe { from_utf8_unchecked(&unwrap_opt(self.json_buf.get(..n)).await) };
ws.send(WsMsg::Text(json)).await?;
}
}
{
let usernames = USERNAMES.lock().await;
let n = unwrap(serde_json_core::to_slice(&(*usernames), &mut self.json_buf)).await;
let json =
unsafe { from_utf8_unchecked(&unwrap_opt(self.json_buf.get(..n)).await) };
ws.send(WsMsg::Text(json)).await?;
}
loop {
Timer::after_secs(1).await;
Timer::after_millis(1).await;
{
let uv = USERNAMES_VERSION.load(Ordering::Relaxed);
if self.usernames_version < uv {
ws.send_json(&(*USERNAMES.lock().await)).await?;
}
}
{
let msgs = MSGS.lock().await;
info!("{:?}", msgs);
Timer::after_millis(100).await;
for m in msgs.iter() {
info!("{:?}", m);
Timer::after_millis(100).await;
if m.id >= self.next_msg {
ws.send_json(&m).await?;
}
}
self.next_msg = msgs.next_msg;
}
if ws.last_msg.elapsed() >= Duration::from_secs(5) {
ws.send(WsMsg::Ping(&[])).await?;
}
while let Some(r) = ws.rcv().await? {
info!("{:?}", r);
if let WsMsg::Text(r) = r {
@ -216,23 +220,12 @@ impl App for ChatApp {
warn!("Message too long! (len={})", r.len() - 5);
return;
}
{
MSGS.lock()
.await
.push(self.id, r.get(5..).unwrap_or_default());
}
// if r.starts_with("reqmsg ") {
// let Ok(msg_id) = r.get(7..).unwrap_or_default().parse::<isize>() else {
// warn!("Invalid requested message : {}", r);
// return;
// };
// // 0 is next msg
// let msg_rel_id = if msg_id >= 0 {
// }
// if msg_id < 0 {
// }
// }
}
}
}
}

View File

@ -50,9 +50,11 @@ impl App for IndexApp {
name = Some(n);
}
}
Some(("id", id)) => {
if let Ok(id) = id.parse::<u8>() {
if id < apps::chat::USERS_LEN {}
Some(("id", i)) => {
if let Ok(i) = i.parse::<u8>() {
if i < apps::chat::USERS_LEN {
id = Some(i);
}
}
}
_ => {}
@ -130,7 +132,7 @@ impl App for IndexApp {
let mut content = Vec::new();
let r: Result<(), &str> = try {
content.push("const id = ")?;
content.push(id)?;
content.push(id_to_static_str(id).await)?;
content.push(";")?;
// #[cfg(debug_assertions)]
content.push(include_str!("chat.js"))?;

View File

@ -21,10 +21,10 @@ pub trait App {
fn accept_ws(&self, _path: &str) -> bool {
false
}
async fn handle_ws<'a, const BUF_SIZE: usize, const RES_HEAD_BUF_SIZE: usize>(
async fn handle_ws<'a, const BUF_SIZE: usize>(
&'a mut self,
_path: &str,
_ws: Ws<'a, BUF_SIZE, RES_HEAD_BUF_SIZE>,
_ws: Ws<'a, BUF_SIZE>,
) {
}
}

View File

@ -1,4 +1,5 @@
<!doctype html>
<html>
<head>
<style type="text/css">
body {
@ -22,7 +23,6 @@
</style>
<script src="/ttt.js" defer></script>
</head>
<html>
<body>
<h1>TicTacToe</h1>
<h3 id="team"></h3>

View File

@ -91,10 +91,10 @@ impl App for TttApp {
fn accept_ws(&self, path: &str) -> bool {
(self.team == Team::Zero && path == "/blue") || (self.team == Team::One && path == "/red")
}
async fn handle_ws<'a, const BUF_SIZE: usize, const RES_HEAD_BUF_SIZE: usize>(
async fn handle_ws<'a, const BUF_SIZE: usize>(
&'a mut self,
_path: &str,
mut ws: Ws<'a, BUF_SIZE, RES_HEAD_BUF_SIZE>,
mut ws: Ws<'a, BUF_SIZE>,
) {
Timer::after_millis(500).await;
let r: Result<(), ()> = try {

View File

@ -6,6 +6,8 @@
#![feature(try_blocks)]
#![feature(impl_trait_in_bindings)]
#![feature(array_repeat)]
#![feature(generic_arg_infer)]
#![feature(async_iterator)]
#[cfg(feature = "wifi-connect")]
use core::net::Ipv4Addr;

View File

@ -227,10 +227,7 @@ pub async fn listen_task(stack: embassy_net::Stack<'static>, mut app: impl apps:
}
}
if let Some(path) = ws_path {
app.handle_ws(
&path,
Ws::new(&mut socket, &mut buf, &mut head_buf, app.socket_name()),
)
app.handle_ws(&path, Ws::new(&mut socket, &mut buf, app.socket_name()))
.await;
}
}

View File

@ -1,4 +1,4 @@
use core::str::from_utf8;
use core::str::{from_utf8, from_utf8_unchecked};
use embassy_net::tcp::{TcpReader, TcpSocket, TcpWriter};
use embassy_time::{Instant, Timer};
@ -16,6 +16,10 @@ pub enum WsMsg<'a> {
Unknown(u8, &'a [u8]),
}
impl WsMsg<'_> {
const TEXT: u8 = 1;
const BYTES: u8 = 2;
const PING: u8 = 9;
const PONG: u8 = 10;
pub const fn len(&self) -> usize {
self.as_bytes().len()
}
@ -41,47 +45,73 @@ struct WsRx<'a, const BUF_SIZE: usize> {
buf: &'a mut [u8; BUF_SIZE],
msg_in_buf: Option<(usize, usize)>, // (start, length)
}
struct WsTx<'a, const HEAD_BUF_SIZE: usize> {
struct WsTx<'a> {
socket: TcpWriter<'a>,
head_buf: &'a mut Vec<u8, HEAD_BUF_SIZE>,
}
impl<'a, const HEAD_BUF_SIZE: usize> WsTx<'a, HEAD_BUF_SIZE> {
pub async fn send<'m>(&mut self, msg: WsMsg<'m>) -> Result<(), ()> {
self.head_buf.clear();
unwrap(self.head_buf.push(0b1000_0000 | msg.code())).await;
if msg.len() < 126 {
unwrap(self.head_buf.push(msg.len() as u8)).await;
impl<'a> WsTx<'a> {
pub async fn send_with<F: Fn(&mut [u8]) -> Result<usize, ()>>(
&mut self,
msg_code: u8,
f: F,
) -> Result<(), ()> {
if self.send_with_no_flush(msg_code, &f).await.is_err() {
self.socket.flush().await.map_err(|_| ())?;
self.send_with_no_flush(msg_code, f).await
} else {
unwrap(self.head_buf.push(0b0111_1110)).await;
unwrap(
self.head_buf
.extend_from_slice(&(msg.len() as u16).to_le_bytes()),
)
.await;
Ok(())
}
let w: Result<(), <TcpSocket<'_> as ErrorType>::Error> = try {
self.socket.write_all(&self.head_buf).await?;
self.socket.write_all(msg.as_bytes()).await?;
}
pub async fn send_with_no_flush<F: FnOnce(&mut [u8]) -> Result<usize, ()>>(
&mut self,
msg_code: u8,
f: F,
) -> Result<(), ()> {
self.socket
.write_with(|buf| {
buf[0] = 0b1000_0000 | msg_code;
let Ok(n) = f(&mut buf[4..]) else {
return (0, Err(()));
};
w.map_err(|e| {
warn!("write error: {:?}", e);
if n < 126 {
buf[1] = n as u8;
buf.copy_within(4..4 + n, 2);
(n + 2, Ok(()))
} else {
buf[1..=2].copy_from_slice(&(n as u16).to_le_bytes());
(n + 4, Ok(()))
}
})
.await
.map_err(|_| ())?
}
pub async fn send<'m>(&mut self, msg: WsMsg<'m>) -> Result<(), ()> {
self.send_with(msg.code(), |buf| {
let msg = msg.as_bytes();
if buf.len() < msg.len() {
Err(())
} else {
buf[..msg.len()].copy_from_slice(msg);
Ok(msg.len())
}
})
.await
}
pub async fn send_json<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), ()> {
self.send_with(WsMsg::TEXT, |buf| {
serde_json_core::to_slice(msg, buf).map_err(|_| ())
})
.await
}
}
pub struct Ws<'a, const BUF_SIZE: usize = 1024, const RES_HEAD_BUF_SIZE: usize = 256> {
pub struct Ws<'a, const BUF_SIZE: usize = 1024> {
rx: WsRx<'a, BUF_SIZE>,
tx: WsTx<'a, RES_HEAD_BUF_SIZE>,
tx: WsTx<'a>,
pub last_msg: Instant,
name: &'a str,
}
impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEAD_BUF_SIZE> {
pub fn new(
socket: &'a mut TcpSocket,
buf: &'a mut [u8; BUF_SIZE],
head_buf: &'a mut Vec<u8, HEAD_BUF_SIZE>,
name: &'a str,
) -> Self {
impl<'a, const BUF_SIZE: usize> Ws<'a, BUF_SIZE> {
pub fn new(socket: &'a mut TcpSocket, buf: &'a mut [u8; BUF_SIZE], name: &'a str) -> Self {
let (rx, tx) = socket.split();
Self {
rx: WsRx {
@ -89,10 +119,7 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
buf,
msg_in_buf: None,
},
tx: WsTx {
socket: tx,
head_buf,
},
tx: WsTx { socket: tx },
last_msg: Instant::MIN,
name,
}
@ -228,4 +255,27 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
self.last_msg = Instant::now();
Ok(())
}
pub async fn send_json<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), ()> {
self.tx.send_json(msg).await?;
self.last_msg = Instant::now();
Ok(())
}
pub async fn send_with<F: Fn(&mut [u8]) -> Result<usize, ()>>(
&mut self,
msg_code: u8,
f: F,
) -> Result<(), ()> {
self.tx.send_with(msg_code, f).await?;
self.last_msg = Instant::now();
Ok(())
}
pub async fn send_with_no_flush<F: FnOnce(&mut [u8]) -> Result<usize, ()>>(
&mut self,
msg_code: u8,
f: F,
) -> Result<(), ()> {
self.tx.send_with_no_flush(msg_code, f).await?;
self.last_msg = Instant::now();
Ok(())
}
}