'How to config payload limit for a actix websocket server

I am learning Actix and try creating a WebSocket service

code snippets:

starts server

pub async fn start(addr: &str) -> std::result::Result<(), IoError> {
    let connections = Connections::default().start();
    HttpServer::new(move || {
        App::new().service(
            web::resource("/ws/")
                .data(connections.clone())
                .route(web::get().to(ws_index)),
        )
    })
    .bind(addr)?
    .run()
    .await
}

handler

async fn ws_index(
    req: HttpRequest,
    stream: web::Payload,
    addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
    let id = Uuid::new_v4().to_simple().to_string();
    let client = Connection::new(id, addr.get_ref().clone());
    let resp = ws::start(client, &req, stream);
    resp
}

streamhandler

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
            Ok(ws::Message::Text(text)) => ctx.text(text),
            Ok(ws::Message::Close(reason)) => {
                ctx.stop();
                println!("Connection {} closed with reason {:?}", self.id, reason);
            }
            Err(e) => println!("Error: {}", e),
            _ => (),
        }
    }
}

Now the WebSocket server is running, it can receive a text and send it back. But if I send a large text, the server logs "Error: A payload reached size limit.". How to fix it?



Solution 1:[1]

Instead of using ws::start() you have to create the WebSocket with a specific codec, as that is the place where you can set the size of the payload https://docs.rs/actix-http/2.2.0/actix_http/ws/struct.Codec.html.

So create your start function:

use actix_web::error::PayloadError;
use ws::{handshake, WebsocketContext};                        
use actix_http::ws::{Codec, Message, ProtocolError};
use bytes::Bytes;

fn start_with_codec<A, S>(actor: A, req: &HttpRequest, stream: S, codec: Codec) -> Result<HttpResponse, Error>
where
    A: Actor<Context = WebsocketContext<A>>
        + StreamHandler<Result<Message, ProtocolError>>,
    S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{   
    let mut res = handshake(req)?;
    Ok(res.streaming(WebsocketContext::with_codec(actor, stream, codec)))
} 

Then you change your code invoking the new function and passing the codec:

async fn ws_index(
    req: HttpRequest,
    stream: web::Payload,
    addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
    let id = Uuid::new_v4().to_simple().to_string();
    let client = Connection::new(id, addr.get_ref().clone());
    let resp = start_with_codec(client, &req, stream, Codec::new().max_size(MAX_SIZE));
    resp
}

UPDATED

On big payload the Message can be broken up in several smaller pices through the Message::Continuation variant.

This variant contains an Item enum with the following variants:

pub enum Item {
    FirstText(Bytes),
    FirstBinary(Bytes),
    Continue(Bytes),
    Last(Bytes),
}

To recompose the original message you have to collect all the pieces from the FirstText / FirstBinary until the Last, or be sure to forward all those messages in the proper order to the destination where the Message will be retrieved.

Solution 2:[2]

As of actix web v4 they made it easy to configure a websocket with WsResponseBuilder.

You can now do:

async fn ws_index(
    req: HttpRequest,
    stream: web::Payload,
    addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
    let id = Uuid::new_v4().to_simple().to_string();
    let client = Connection::new(id, addr.get_ref().clone());
    ws::WsResponseBuilder::new(client, &req, stream)
        .frame_size(MAX_SIZE)
        .start()
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Manstie