Bonjour, Habr! Cette fois, j'ai essayé de faire une simple discussion via Websockets. Pour plus de détails, bienvenue sous chat.
Contenu
- Apprendre Scala: Partie 1 - Jeu de serpent
- Learning Scala: Partie 2 - Feuille Todo avec téléchargement d'images
- Apprendre Scala: Partie 3 - Tests unitaires
- Apprendre Scala: Partie 4 - WebSocket
Liens
En fait, tout le code est dans un seul objet ChatHub
class ChatHub[F[_]] private(
val topic: Topic[F, WebSocketFrame],
private val ref: Ref[F, Int]
)
(
implicit concurrent: Concurrent[F],
timer: Timer[F]
) extends Http4sDsl[F] {
val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary(" . : ws://localhost:8080/chat")
.description(" ")
.in(
stringBody
.description(" ")
.example("!")
)
.out(
stringBody
.description(" - ")
.example("6 : Id f518a53d: !")
)
// .
.serverLogic(_ => IO(Left(()): Either[Unit, String]))
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
private def logic(): F[Response[F]] = {
val toClient: Stream[F, WebSocketFrame] =
topic.subscribe(1000)
val fromClient: Pipe[F, WebSocketFrame, Unit] =
handle
WebSocketBuilder[F].build(toClient, fromClient)
}
private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
.collect({
case WebSocketFrame.Text(text, _) => text
})
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
.through(topic.publish)
}
object ChatHub {
def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
ref <- Ref.of[F, Int](0)
topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
} yield new ChatHub(topic, ref)
}
Ici, vous devez immédiatement parler de Topic - une primitive de synchronisation de Fs2 qui vous permet de créer un modèle Éditeur - Abonné, et vous pouvez avoir plusieurs éditeurs et plusieurs abonnés en même temps. En général, il est préférable de lui envoyer des messages via une sorte de tampon comme Queue, car il a une limite sur le nombre de messages dans la file d'attente et Publisher attend que tous les abonnés reçoivent des messages dans leur file d'attente de messages et s'il est débordé, il peut se bloquer.
val topic: Topic[F, WebSocketFrame],
Ici, je compte également le nombre de messages envoyés au chat comme le nombre de chaque message. Comme je dois le faire à partir de différents threads, j'ai utilisé un analogue d'Atomic, qui s'appelle ici Ref et garantit l'atomicité de l'opération.
private val ref: Ref[F, Int]
Traitement d'un flux de messages d'utilisateurs.
private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] =
stream
// .
.collect({
case WebSocketFrame.Text(text, _) => text
})
// .
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//
.through(topic.publish)
En fait, la logique même de la création d'une socket.
private def logic(): F[Response[F]] = {
// .
val toClient: Stream[F, WebSocketFrame] =
//
topic.subscribe(1000)
//
val fromClient: Pipe[F, WebSocketFrame, Unit] =
//
handle
// .
WebSocketBuilder[F].build(toClient, fromClient)
}
Nous lions notre socket à la route sur le serveur (ws: // localhost: 8080 / chat)
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
En fait, c'est tout. Ensuite, vous pouvez démarrer le serveur avec cette route. Je voulais toujours faire tout type de documentation. En général, pour documenter WebSocket et d'autres interactions basées sur des événements comme RabbitMQ AMPQ, il y a AsynAPI, mais il n'y a rien sous Tapir, donc j'ai juste fait une description du point de terminaison pour Swagger comme une requête GET. Bien sûr, il ne fonctionnera pas. Plus précisément, une erreur 501 sera renvoyée, mais elle sera affichée dans Swagger
val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary(" . : ws://localhost:8080/chat")
.description(" ")
.in(
stringBody
.description(" ")
.example("!")
)
.out(
stringBody
.description(" - ")
.example("6 : Id f518a53d: !")
)
Dans le swagger lui-même, cela ressemble à ceci: connectez
notre chat à notre serveur API
todosController = new TodosController()
imagesController = new ImagesController()
//
chatHub <- Resource.liftF(ChatHub[IO]())
endpoints = todosController.endpoints ::: imagesController.endpoints
// Swagger
docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
yml: String = docs.toYaml
//
routes = chatHub.routeWs <+>
endpoints.toRoutes <+>
new SwaggerHttp4s(yml, "swagger").routes[IO]
httpApp = Router(
"/" -> routes
).orNotFound
blazeServer <- BlazeServerBuilder[IO](serverEc)
.bindHttp(settings.host.port, settings.host.host)
.withHttpApp(httpApp)
.resource
Nous nous connectons au chat avec un script extrêmement simple.
<script>
const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
const webSocket = new WebSocket('ws://localhost:8080/chat');
webSocket.onopen = event => {
alert('onopen ');
};
webSocket.onmessage = event => {
console.log(event);
receive(event.data);
};
webSocket.onclose = event => {
alert('onclose ');
};
function send() {
let text = document.getElementById("message");
webSocket.send(` Id ${id}: ${text.value}`);
text.value = '';
}
function receive(m) {
let text = document.getElementById("chat");
text.value = text.value + '\n\r' + m;
}
</script>
C'est en fait tout. J'espère que quelqu'un qui étudie également le rock trouvera cet article intéressant et peut-être même utile.