a lot of things + ws can rcv multiple msgs at once

This commit is contained in:
Arkitu 2025-08-01 02:35:25 +02:00
parent f136f55266
commit 3782686d4b
10 changed files with 283 additions and 212 deletions

View File

@ -11,8 +11,8 @@ wifi-connect = [
dhcp = ["dep:dhcparse"] dhcp = ["dep:dhcparse"]
dns = ["dep:dnsparse"] dns = ["dep:dnsparse"]
chat = ["dep:ringbuffer"] chat = ["dep:ringbuffer"]
ttt = [] ttt = ["dep:serde-json-core", "dep:serde"]
default = ["dhcp", "dns"] default = ["dhcp", "dns", "ttt"]
[dependencies] [dependencies]
embassy-executor = { git = "https://github.com/embassy-rs/embassy", features = [ embassy-executor = { git = "https://github.com/embassy-rs/embassy", features = [

View File

@ -1,4 +1,7 @@
use crate::socket::{HttpRequestType, HttpResCode}; use crate::{
apps::Content,
socket::{HttpRequestType, HttpResCode},
};
use super::App; use super::App;
@ -7,17 +10,29 @@ impl App for IndexApp {
fn socket_name(&self) -> &'static str { fn socket_name(&self) -> &'static str {
"index" "index"
} }
async fn handle_request<'a>( async fn handle_request(
&'a mut self, &mut self,
path: &str, path: &str,
_req_type: HttpRequestType, _req_type: HttpRequestType,
_content: &str, _content: &str,
) -> (HttpResCode, &'static str, &'a str) { ) -> (HttpResCode, &'static str, Option<Content<'_>>) {
match path { match path {
"/" | "/index" | "/index.html" => { "/" | "/index" | "/index.html" => (
(HttpResCode::Ok, "html", include_str!("./index.html")) HttpResCode::Ok,
} "html",
_ => (HttpResCode::NotFound, "", ""), Some(include_str!("./index.html").into()),
),
"/ttt" => (
HttpResCode::Ok,
"html",
Some(include_str!("ttt.html").into()),
),
"/ttt.js" => (
HttpResCode::Ok,
"javascript",
Some(include_str!("ttt.js").into()),
),
_ => (HttpResCode::NotFound, "", None),
} }
} }
} }

View File

@ -1,3 +1,5 @@
use heapless::Vec;
use crate::socket::{HttpRequestType, HttpResCode, ws::Ws}; use crate::socket::{HttpRequestType, HttpResCode, ws::Ws};
#[cfg(feature = "chat")] #[cfg(feature = "chat")]
@ -10,11 +12,11 @@ pub trait App {
fn socket_name(&self) -> &'static str; fn socket_name(&self) -> &'static str;
async fn handle_request<'a>( async fn handle_request<'a>(
&'a mut self, &'a mut self,
_path: &str, _path: &'a str,
_req_type: HttpRequestType, _req_type: HttpRequestType,
_content: &str, _content: &'a str,
) -> (HttpResCode, &'static str, &'a str) { ) -> (HttpResCode, &'static str, Option<Content<'a>>) {
(HttpResCode::NotFound, "", "") (HttpResCode::NotFound, "", None)
} }
fn accept_ws(&self, _path: &str) -> bool { fn accept_ws(&self, _path: &str) -> bool {
false false
@ -26,3 +28,24 @@ pub trait App {
) { ) {
} }
} }
pub struct Content<'a>(pub Vec<&'a str, 8>);
// pub enum Content<'a> {
// Str(&'a str),
// /// Return the number of bytes written
// /// (fn that writes content, length)
// Fn(fn(&mut [u8]) -> usize, usize),
// }
impl<'a> From<&'a str> for Content<'a> {
fn from(value: &'a str) -> Self {
let mut v = Vec::new();
v.push(value).unwrap();
Content(v)
}
}
impl Content<'_> {
pub fn len(&self) -> usize {
self.0.iter().fold(0, |acc, s| acc + s.len())
}
}

View File

@ -1,6 +1,6 @@
<!doctype html> <!doctype html>
<head> <head>
<script src="./htmx.js"></script> <!-- <script src="./htmx.js"></script> -->
<style type="text/css"> <style type="text/css">
body { body {
#grid { #grid {
@ -8,6 +8,12 @@
border: 1px dotted black; border: 1px dotted black;
padding: 33%; padding: 33%;
} }
.cell[team="0"] {
background-color: dodgerblue;
}
.cell[team="1"] {
background-color: firebrick;
}
display: grid; display: grid;
border: 1px solid black; border: 1px solid black;
grid-template-rows: 1fr 1fr 1fr; grid-template-rows: 1fr 1fr 1fr;
@ -15,16 +21,24 @@
} }
} }
</style> </style>
<script src="/ttt.js" defer></script>
</head> </head>
<html> <html>
<body> <body>
<h1>TicTacToe</h1> <h1>TicTacToe</h1>
<div <h3 id="team"></h3>
id="game" <!-- <div class="cell" style="background-color:"></div> -->
hx-get="/ttt/initial_game" <div id="grid">
hx-swap="outerHTML" <div class="cell" id="cell0"></div>
hx-trigger="load" <div class="cell" id="cell1"></div>
hx-target="this" <div class="cell" id="cell2"></div>
></div> <div class="cell" id="cell3"></div>
<div class="cell" id="cell4"></div>
<div class="cell" id="cell5"></div>
<div class="cell" id="cell6"></div>
<div class="cell" id="cell7"></div>
<div class="cell" id="cell8"></div>
<div class="cell" id="cell9"></div>
</div>
</body> </body>
</html> </html>

52
src/apps/ttt.js Normal file
View File

@ -0,0 +1,52 @@
const team = 0;
const teamName = "blue";
const color = "dodgerblue";
const otherColor = "firebrick";
document.getElementById("team").innerHTML =
'Team : <span style="color:' + color + '">' + teamName + "</span>";
const ws = new WebSocket("ws://192.254.0.2:8080/" + teamName);
ws.onmessage = (event) => {
console.log(event.data);
if (typeof event.data == "string") {
let msg = JSON.parse(event.data);
let cells = [];
for (let i = 0; i < 9; i++) {
let owner = null;
if (((msg.board >> (17 - i)) & 1) == 1) {
owner = 0;
} else if (((msg.board >> (8 - i)) & 1) == 1) {
owner = 1;
}
let tagName;
if (msg.turn == team && owner === null) {
tagName = "button";
} else {
tagName = "div";
}
let cell = document.createElement(tagName);
cell.classList.add("cell");
if (tagName === "button") {
cell.addEventListener("click", (event) => {
console.log(i);
ws.send(new Uint8Array([i]));
});
}
cell.setAttribute("team", owner);
// if (msg.board & (1 << i != 0) || msg.board & (1 << (i + 9))) {
// if (msg.board & (1 << (i + 9) != 0)) {
// col = "firebrick";
// }
// }
cells.push(cell);
}
document.getElementById("grid").replaceChildren(...cells);
}
};

View File

@ -1,31 +1,32 @@
use core::fmt::Write; use core::fmt::Write;
use core::{ops::Not, sync::atomic::Ordering}; use core::{ops::Not, sync::atomic::Ordering};
use embassy_time::{Duration, Instant}; use embassy_time::{Duration, Instant, Timer};
use heapless::String; use heapless::{String, Vec};
use log::info;
use pico_website::unwrap; use pico_website::unwrap;
use portable_atomic::{AtomicBool, AtomicU32}; use portable_atomic::{AtomicBool, AtomicU32};
use serde::Serialize;
use crate::apps::Content;
use crate::socket::ws::{Ws, WsMsg};
use crate::socket::{HttpRequestType, HttpResCode}; use crate::socket::{HttpRequestType, HttpResCode};
use super::App; use super::App;
static TURN: AtomicBool = AtomicBool::new(false); static TURN: AtomicBool = AtomicBool::new(false);
// bits [0; 8] : player zero board / bits [9; 17] : player one board // bits [0; 8] : player zero board / bits [9; 17] : player one board / is_ended [18] / is_draw [19] / winner [20]: 0=blue 1=green / current_turn [21]: 0=blue 1=green
static BOARD: AtomicU32 = AtomicU32::new(0); static BOARD: AtomicU32 = AtomicU32::new(0b01000000_001000000);
pub struct TttApp { pub struct TttApp {
res_buf: String<2048>,
/// State of the board last time it has been sent
last_board: u32,
team: Team, team: Team,
last_board: u32,
end: Option<(Instant, Option<Team>)>, end: Option<(Instant, Option<Team>)>,
} }
impl TttApp { impl TttApp {
pub fn new(team: Team) -> Self { pub fn new(team: Team) -> Self {
Self { Self {
res_buf: String::new(),
last_board: 0,
team, team,
last_board: 0,
end: None, end: None,
} }
} }
@ -67,83 +68,6 @@ impl TttApp {
} }
} }
} }
/// Generate board html
async fn generate_board_res<'a>(
&'a mut self,
board: u32,
turn: Team,
outer_html: bool,
) -> &'a str {
self.res_buf.clear();
if outer_html {
unwrap(self.res_buf.push_str(
"<div \
id=\"game\" \
hx-get=\"/ttt/game\" \
hx-swap=\"innerHTML\" \
hx-trigger=\"every 100ms\" \
hx-target=\"this\"\
>",
))
.await;
}
unwrap(write!(
self.res_buf,
"<h3>Team : <span style=\"color:{}\">{}</span></h3>",
self.team.color(),
self.team.name()
))
.await;
match self.end {
Some((_, Some(t))) => {
unwrap(write!(
self.res_buf,
"<br><h3>Team <span style=\"color:{}\">{}</span> has won!</h3><br>",
t.color(),
t.name()
))
.await
}
Some((_, None)) => unwrap(write!(self.res_buf, "<br><h3>Draw!</h3><br>",)).await,
None => {}
}
unwrap(self.res_buf.push_str("<div id=\"grid\">")).await;
for c in 0..=8 {
let picked_by = if board & (1 << c) != 0 {
Some(Team::Zero)
} else if board & (1 << (9 + c)) != 0 {
Some(Team::One)
} else {
None
};
match picked_by {
Some(t) => {
unwrap(write!(
self.res_buf,
"<div class=\"cell\" style=\"background-color:{}\"></div>",
t.color()
))
.await;
}
None => {
if self.team == turn.into() && self.end.is_none() {
unwrap(write!(
self.res_buf,
"<button class=\"cell\" hx-post=\"/ttt/cell{}\" hx-trigger=\"click\" hx-target=\"#game\" hx-swap=\"innerHTML\"></button>",
c
)).await;
} else {
unwrap(self.res_buf.push_str("<div class=\"cell\"></div>")).await;
}
}
};
}
unwrap(self.res_buf.push_str("</div>")).await;
if outer_html {
unwrap(self.res_buf.push_str("</div>")).await;
}
&self.res_buf
}
} }
impl App for TttApp { impl App for TttApp {
@ -155,61 +79,54 @@ impl App for TttApp {
path: &str, path: &str,
_req_type: HttpRequestType, _req_type: HttpRequestType,
_content: &str, _content: &str,
) -> (HttpResCode, &'static str, &'a str) { ) -> (HttpResCode, &'static str, Option<Content<'a>>) {
match path { match path {
"/" | "/index" | "/index.html" | "/ttt" | "/ttt.html" => { "/" | "/index" | "/index.html" | "/ttt" | "/ttt.html" => (
(HttpResCode::Ok, "html", include_str!("./ttt.html"))
}
"/ttt/initial_game" => {
let board = BOARD.load(Ordering::Acquire);
let turn = TURN.load(Ordering::Acquire);
(
HttpResCode::Ok, HttpResCode::Ok,
"html", "html",
self.generate_board_res(board, turn.into(), true).await, Some(include_str!("ttt.html").into()),
) ),
_ => (HttpResCode::NotFound, "", None),
} }
path => { }
if (path.starts_with("/ttt/cell") && path.len() == 10) || path == "/ttt/game" { fn accept_ws(&self, path: &str) -> bool {
let mut board = BOARD.load(Ordering::Acquire); matches!(path, "/blue" | "/red")
let mut turn = TURN.load(Ordering::Acquire); }
async fn handle_ws<'a, const BUF_SIZE: usize, const RES_HEAD_BUF_SIZE: usize>(
// just return correct board in case of unauthorized move &'a mut self,
if path.starts_with("/ttt/cell") && self.team == turn.into() { _path: &str,
let clicked_c: Cell = match TryInto::<Cell>::try_into( mut ws: Ws<'a, BUF_SIZE, RES_HEAD_BUF_SIZE>,
unwrap(path.chars().nth(9).ok_or("no 9th char")).await,
) { ) {
Ok(c) => c, let r: Result<(), ()> = try {
Err(_) => return (HttpResCode::NotFound, "", ""), loop {
let board = BOARD.load(Ordering::Acquire);
ws.send(WsMsg::Text(
&serde_json_core::to_string::<_, 40>(&ServerMsg {
board,
turn: Some(Team::Zero),
winner: None,
})
.unwrap(),
))
.await?;
while let Some(r) = ws.rcv().await? {
info!("{:?}", r);
}
Timer::after_secs(1).await;
}
}; };
if board & ((1 << (clicked_c as u32)) + (1 << (9 + clicked_c as u32))) != 0 info!("{:?}", r);
{
return (HttpResCode::Forbidden, "", "");
}
board = board | (1 << ((self.team as u32 * 9) + clicked_c as u32));
turn = (!self.team).into();
BOARD.store(board, Ordering::Release);
TURN.store(turn, Ordering::Release);
}
self.update_end_state(&mut board);
if self.last_board != board {
self.last_board = board;
(
HttpResCode::Ok,
"html",
self.generate_board_res(board, turn.into(), false).await,
)
} else {
(HttpResCode::NoContent, "", "")
}
} else {
(HttpResCode::NotFound, "", "")
}
}
}
} }
} }
#[derive(Debug, Serialize)]
struct ServerMsg {
board: u32,
turn: Option<Team>,
winner: Option<Team>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Team { pub enum Team {
Zero = 0, Zero = 0,
@ -237,6 +154,14 @@ impl Not for Team {
} }
} }
} }
impl Serialize for Team {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_u8(*self as u8)
}
}
impl Team { impl Team {
fn color(self) -> &'static str { fn color(self) -> &'static str {
match self { match self {

View File

@ -7,12 +7,9 @@ use log::info;
pub async fn unwrap<T, E: Debug>(res: Result<T, E>) -> T { pub async fn unwrap<T, E: Debug>(res: Result<T, E>) -> T {
match res { match res {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => loop {
info!("FATAL ERROR : {:?}", e);
loop {
info!("FATAL ERROR : {:?}", e); info!("FATAL ERROR : {:?}", e);
Timer::after_secs(5).await; Timer::after_secs(5).await;
} },
}
} }
} }

View File

@ -4,6 +4,7 @@
#![feature(impl_trait_in_assoc_type)] #![feature(impl_trait_in_assoc_type)]
#![feature(slice_split_once)] #![feature(slice_split_once)]
#![feature(try_blocks)] #![feature(try_blocks)]
#![feature(impl_trait_in_bindings)]
#[cfg(feature = "wifi-connect")] #[cfg(feature = "wifi-connect")]
use core::net::Ipv4Addr; use core::net::Ipv4Addr;

View File

@ -8,6 +8,7 @@ use heapless::{String, Vec};
use log::{info, warn}; use log::{info, warn};
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
use crate::apps::Content;
use crate::{apps, socket::ws::Ws}; use crate::{apps, socket::ws::Ws};
pub mod ws; pub mod ws;
@ -55,6 +56,7 @@ pub async fn listen_task(stack: embassy_net::Stack<'static>, mut app: impl apps:
socket.remote_endpoint() socket.remote_endpoint()
); );
let mut ws_path: Option<String<16>> = None;
loop { loop {
Timer::after_secs(0).await; Timer::after_secs(0).await;
let n = match socket.read(&mut buf).await { let n = match socket.read(&mut buf).await {
@ -145,7 +147,7 @@ pub async fn listen_task(stack: embassy_net::Stack<'static>, mut app: impl apps:
Timer::after_secs(0).await; Timer::after_secs(0).await;
head_buf.clear(); head_buf.clear();
let res_content: Result<&str, core::fmt::Error> = try { let res_content: Result<Option<Content>, core::fmt::Error> = try {
if ws_handshake { if ws_handshake {
if !app.accept_ws(path) { if !app.accept_ws(path) {
write!( write!(
@ -153,7 +155,7 @@ pub async fn listen_task(stack: embassy_net::Stack<'static>, mut app: impl apps:
"{}\r\n\r\n", "{}\r\n\r\n",
Into::<&str>::into(HttpResCode::NotFound) Into::<&str>::into(HttpResCode::NotFound)
)?; )?;
"" None
} else { } else {
if path.len() > 16 { if path.len() > 16 {
warn!("Ws socket cannot have path longer than 16 chars!"); warn!("Ws socket cannot have path longer than 16 chars!");
@ -180,31 +182,31 @@ pub async fn listen_task(stack: embassy_net::Stack<'static>, mut app: impl apps:
Into::<&str>::into(HttpResCode::SwitchingProtocols), Into::<&str>::into(HttpResCode::SwitchingProtocols),
accept accept
)?; )?;
"" None
} }
} else { } else {
let (code, res_type, res_content): (HttpResCode, &str, &str) = match path { let (code, res_type, res_content) = match path {
"/htmx.js" => ( "/htmx.js" => (
HttpResCode::Ok, HttpResCode::Ok,
"javascript", "javascript",
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
include_str!("../static/htmx.js"), Some(include_str!("../static/htmx.js").into()),
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
include_bytes!("../static/htmx.min.js"), Some(include_bytes!("../static/htmx.min.js").into()),
), ),
_ => app.handle_request(path, request_type, content).await, _ => app.handle_request(path, request_type, content).await,
}; };
write!(&mut head_buf, "{}", Into::<&str>::into(code))?; write!(&mut head_buf, "{}", Into::<&str>::into(code))?;
if res_type.len() > 0 { if let Some(ref c) = res_content {
write!( write!(
&mut head_buf, &mut head_buf,
"\r\n\ "\r\n\
Content-Type: text/{}\r\n\ Content-Type: text/{}\r\n\
Content-Length: {}\r\n", Content-Length: {}\r\n",
res_type, res_type,
res_content.len() c.len()
)?; )?
} }
write!(&mut head_buf, "\r\n\r\n")?; write!(&mut head_buf, "\r\n\r\n")?;
res_content res_content
@ -221,30 +223,32 @@ pub async fn listen_task(stack: embassy_net::Stack<'static>, mut app: impl apps:
info!("\n{}\n", from_utf8(&head_buf).unwrap()); info!("\n{}\n", from_utf8(&head_buf).unwrap());
match socket.write_all(&head_buf).await { let w: Result<(), embassy_net::tcp::Error> = try {
Ok(()) => {} socket.write_all(&head_buf).await?;
Err(e) => { if let Some(ref c) = res_content {
warn!("write error: {:?}", e); for s in c.0.iter() {
break; socket.write_all(s.as_bytes()).await?;
} }
}; } else {
match socket.write_all(res_content.as_bytes()).await {
Ok(()) => {}
Err(e) => {
warn!("write error: {:?}", e);
break;
} }
}; };
if let Err(e) = w {
warn!("write error: {:?}", e);
break;
};
if ws_handshake { if ws_handshake {
let path: String<16> = String::from_str(path).unwrap(); ws_path = Some(String::from_str(path).unwrap());
break;
}
}
if let Some(path) = ws_path {
app.handle_ws( app.handle_ws(
&path, &path,
Ws::new(&mut socket, &mut buf, &mut head_buf, app.socket_name()), Ws::new(&mut socket, &mut buf, &mut head_buf, app.socket_name()),
) )
.await; .await;
break;
}
} }
} }
} }

View File

@ -1,12 +1,12 @@
use core::str::from_utf8; use core::str::from_utf8;
use embassy_net::tcp::{TcpReader, TcpSocket, TcpWriter}; use embassy_net::tcp::{TcpReader, TcpSocket, TcpWriter};
use embassy_time::Instant; use embassy_time::{Instant, Timer};
use embedded_io_async::{ErrorType, ReadReady, Write}; use embedded_io_async::{ErrorType, ReadReady, Write};
use heapless::Vec; use heapless::Vec;
use log::{info, warn}; use log::{info, warn};
#[derive(Clone, Copy)] #[derive(Clone, Copy, Debug)]
pub enum WsMsg<'a> { pub enum WsMsg<'a> {
Ping(&'a [u8]), Ping(&'a [u8]),
Pong(&'a [u8]), Pong(&'a [u8]),
@ -39,6 +39,7 @@ struct WsRx<'a, const BUF_SIZE: usize> {
socket: TcpReader<'a>, socket: TcpReader<'a>,
buf: &'a mut [u8; BUF_SIZE], buf: &'a mut [u8; BUF_SIZE],
last_msg: Instant, last_msg: Instant,
msg_in_buf: Option<(usize, usize)>, // (start, length)
} }
struct WsTx<'a, const HEAD_BUF_SIZE: usize> { struct WsTx<'a, const HEAD_BUF_SIZE: usize> {
socket: TcpWriter<'a>, socket: TcpWriter<'a>,
@ -86,6 +87,7 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
socket: rx, socket: rx,
buf, buf,
last_msg: Instant::MIN, last_msg: Instant::MIN,
msg_in_buf: None,
}, },
tx: WsTx { tx: WsTx {
socket: tx, socket: tx,
@ -95,27 +97,52 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
} }
} }
// Do this often to respond to pings // Do this often to respond to pings
async fn rcv(&mut self) -> Result<Option<WsMsg>, ()> { pub async fn rcv(&mut self) -> Result<Option<WsMsg>, ()> {
if !self.rx.socket.read_ready().unwrap() { let n = match self.rx.msg_in_buf.take() {
return Ok(None); Some(n) => {
} self.rx.buf.copy_within(n.0..n.0 + n.1, 0);
let n = match self.rx.socket.read(self.rx.buf).await { if self.rx.socket.read_ready().unwrap() {
let n_rcv = match self.rx.socket.read(&mut self.rx.buf[n.1..]).await {
Ok(0) => { Ok(0) => {
warn!("read EOF"); info!("read EOF");
return Err(()); return Err(());
} }
Ok(n) => n, Ok(n) => n,
Err(e) => { Err(e) => {
warn!("Socket {}: read error: {:?}", self.name, e); info!("Socket {}: read error: {:?}", self.name, e);
return Err(()); return Err(());
} }
}; };
n.1 + n_rcv
} else {
n.1
}
}
None => {
if self.rx.socket.read_ready().unwrap() {
match self.rx.socket.read(self.rx.buf).await {
Ok(0) => {
info!("read EOF");
return Err(());
}
Ok(n) => n,
Err(e) => {
info!("Socket {}: read error: {:?}", self.name, e);
return Err(());
}
}
} else {
return Ok(None);
}
}
};
if self.rx.buf[0] & 0b1000_0000 == 0 { if self.rx.buf[0] & 0b1000_0000 == 0 {
warn!("Fragmented ws messages are not supported!"); info!("Fragmented ws messages are not supported!");
return Err(()); return Err(());
} }
if self.rx.buf[0] & 0b0111_0000 != 0 { if self.rx.buf[0] & 0b0111_0000 != 0 {
warn!( info!(
"Reserved ws bits are set : {}", "Reserved ws bits are set : {}",
(self.rx.buf[0] >> 4) & 0b0111 (self.rx.buf[0] >> 4) & 0b0111
); );
@ -133,14 +160,14 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
l => (l as u64, 2), l => (l as u64, 2),
}; };
if length > 512 { if length > 512 {
warn!("ws payload bigger than 512!"); info!("ws payload bigger than 512!");
return Err(()); return Err(());
} }
let content = if self.rx.buf[1] & 0b1000_0000 != 0 { let content = if self.rx.buf[1] & 0b1000_0000 != 0 {
// masked message // masked message
if n_after_length + 4 + length as usize > n { if n_after_length + 4 + length as usize > n {
warn!("ws payload smaller than length"); info!("ws payload smaller than length");
return Err(()); return Err(());
} }
let mask_key: [u8; 4] = self.rx.buf[n_after_length..n_after_length + 4] let mask_key: [u8; 4] = self.rx.buf[n_after_length..n_after_length + 4]
@ -152,12 +179,24 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
{ {
*x ^= mask_key[i & 0xff]; *x ^= mask_key[i & 0xff];
} }
if n_after_length + 4 + (length as usize) < n {
self.rx.msg_in_buf = Some((
n_after_length + 4 + (length as usize),
(n - (n_after_length + 4 + (length as usize))),
));
}
&self.rx.buf[n_after_length + 4..n_after_length + 4 + length as usize] &self.rx.buf[n_after_length + 4..n_after_length + 4 + length as usize]
} else { } else {
if n_after_length + length as usize > n { if n_after_length + length as usize > n {
warn!("ws payload smaller than length"); info!("ws payload smaller than length");
return Err(()); return Err(());
} }
if n_after_length + (length as usize) < n {
self.rx.msg_in_buf = Some((
n_after_length + (length as usize),
(n - (n_after_length + (length as usize))),
));
}
&self.rx.buf[n_after_length..n_after_length + length as usize] &self.rx.buf[n_after_length..n_after_length + length as usize]
}; };
self.rx.last_msg = Instant::now(); self.rx.last_msg = Instant::now();
@ -165,9 +204,10 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
// Text message // Text message
1 => { 1 => {
let content = from_utf8(&content).map_err(|_| ())?; let content = from_utf8(&content).map_err(|_| ())?;
info!("Received text : {:?}", content);
Ok(Some(WsMsg::Text(content))) Ok(Some(WsMsg::Text(content)))
} }
// Bytes
2 => Ok(Some(WsMsg::Bytes(content))),
// Ping // Ping
9 => { 9 => {
self.tx.send(WsMsg::Pong(&content)).await?; self.tx.send(WsMsg::Pong(&content)).await?;
@ -181,7 +221,7 @@ impl<'a, const BUF_SIZE: usize, const HEAD_BUF_SIZE: usize> Ws<'a, BUF_SIZE, HEA
} }
} }
} }
pub async fn send(&mut self, msg: WsMsg<'a>) -> Result<(), ()> { pub async fn send(&mut self, msg: WsMsg<'_>) -> Result<(), ()> {
self.tx.send(msg).await self.tx.send(msg).await
} }
} }