'How use postgres (deadpool-postgres) with WebSocket Actix (actix-web-actors)

How to use database connection from a WebSocket handler.

It is possible to use future.into_actor or ctx.spawn or actix::fut::wrap_future but that's not accurate :)

In theory, I should not block the ws handler when making a request to the database. Do I need to somehow send the request to some executor in a separate thread?

Didn't find any information or example of using database with websockets.

type PgPool = deadpool_r2d2::Pool<deadpool_postgres::Manager>;

pub struct MyWebSocket {
    db: deadpool::managed::Object<deadpool_postgres::Manager>    
}

impl MyWebSocket {
    pub fn new(client_db: deadpool::managed::Object<deadpool_postgres::Manager>) -> Self {
        Self { db:client_db }
    }
}

impl Actor for MyWebSocket {
    type Context = ws::WebsocketContext<Self>;
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
         Ok(ws::Message::Text(text)) => {

             /*
               How used this database?
               Error: only allowed inside `async` functions and blocks**
             */

             let stmt = self.db.prepare_cached("SELECT 1 + $1").await.unwrap();
             let rows = self.db.query(&stmt, &[&3]).await.unwrap();
             let value: i32 = rows[0].get(0);
             ctx.text(format!("{}",value));

         },
         ....
        }
    }
}
fn create_pool(max_size: usize) -> PgPool {
    let config:tokio_postgres::Config = config().expect("Error configure");
    let mgr_config = ManagerConfig {
        recycling_method: RecyclingMethod::Fast
    };
    let mgr:deadpool_postgres::Manager = Manager::from_config(config, NoTls, mgr_config);
    let pool:deadpool_r2d2::Pool<deadpool_postgres::Manager> = 
        Pool::builder(mgr).runtime(deadpool_postgres::Runtime::Tokio1).max_size(max_size).build().unwrap();
    pool
} 

#[get("ws/")]
async fn ws_index(req: HttpRequest, stream: web::Payload,db_pool: web::Data<PgPool>) -> Result<HttpResponse, Error> {
    let client:deadpool::managed::Object<deadpool_postgres::Manager> = db_pool.get().await.unwrap();
    let resp = ws::start(MyWebSocket::new(client), &req, stream);
    resp
}

#[actix_web::main]
async fn main() -> std::io::Result<()> { 
    let pool:PgPool = create_pool(2);
    HttpServer::new(move|| {
        App::new()
            .app_data(web::Data::new(pool.clone()))
            .wrap( middleware::DefaultHeaders::new().header("Access-Control-Allow-Origin", "*"))
            .service(ws_index)
            .wrap(middleware::Logger::default())
    })
    .workers(2)
    .bind(("0.0.0.0", 4011))?
    .run()
    .await
}

Console:

error[E0728]: `await` is only allowed inside `async` functions and blocks
   --> src/test_db.rs:89:33
    |
67  | /     fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68  | |         
69  | |         
70  | |         match msg {
...   |
89  | |                      let stmt = self.db.prepare_cached("SELECT 1 + $1").await.unwrap();
    | |                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
...   |
156 | |         }
157 | |     }
    | |_____- this is not `async`

error[E0728]: `await` is only allowed inside `async` functions and blocks
   --> src/test_db.rs:90:33
    |
67  | /     fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68  | |         
69  | |         
70  | |         match msg {
...   |
90  | |                      let rows = self.db.query(&stmt, &[&3]).await.unwrap();
    | |                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
...   |
156 | |         }
157 | |     }
    | |_____- this is not `async`


Solution 1:[1]

You should be able to use WebsocketContext::spawn

let db = self.db.clone();
let fut = async move {
   let stmt = db.prepare_cached("SELECT 1 + $1").await.unwrap();
   let rows = db.query(&stmt, &[&3]).await.unwrap();
   let value: i32 = rows[0].get(0);
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);

On the question in the comment:

How to send back the result of the database operation? Use Shared State or Channels?

The problem you are facing is you cant move ctx. A simple approach is to get the address of the actor from ctx which is clone and you can move it. This approach requires an extra struct and an implementation of Handler.

struct WsMessage(String)

impl Handler<WsMessage> for MyWebSocket {
    type Result = ();

    fn handle(&mut self, msg: WsMessage, ctx: &mut Context<Self>) -> Self::Result {
        ctx.text(msg.0);
    }
}

So now you can do:

let db = self.db.clone();
let addr = ctx.addess();
let fut = async move {
   let stmt = db.prepare_cached("SELECT 1 + $1").await.unwrap();
   let rows = db.query(&stmt, &[&3]).await.unwrap();
   let value: i32 = rows[0].get(0);
   addr.send(WsMessage(String::from(value))).await.unwrap();
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);

Edit

Come to think of it, the above approach is a little too much.

Having a look at ActorFuture you should be able to do:

let db = self.db.clone();

let fut = async move {
   let stmt = db.prepare_cached("SELECT 1 + $1").await.unwrap();
   let rows = db.query(&stmt, &[&3]).await.unwrap();
   let value: i32 = rows[0].get(0);
   value
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
let fut = fut.map(|result, actor, ctx| {
   ctx.text(result.to_string());
});
ctx.spawn(fut)

Solution 2:[2]

It is impossible but you can use the tokio spawn task method and inside the closure you use async await.

Something like this.

tokio::spawn(async move {
    let stmt = self.db.prepare_cached("SELECT 1 + $1").await.unwrap();
    let rows = self.db.query(&stmt, &[&3]).await.unwrap();
    let value: i32 = rows[0].get(0);
    ctx.text(format!("{}",value));
});

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2