'Consume TCP stream and redirect it to another Sink (with Akka Streams)
I try to redirect/forward a TCP stream to another Sink with Akka 2.4.3. The program should open a server socket, listen for incoming connections and then consume the tcp stream. Our sender does not expect/accept replies from us so we never send back anything - we just consume the stream. After framing the tcp stream we need to transform the bytes into something more useful and send it to the Sink.
I tried the following so far but i struggle especially with the part how to not sending tcp packets back to the sender and to properly connect the Sink.
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow
object TcpConsumeOnlyStreamToSink {
implicit val system = ActorSystem("stream-system")
private val log = Logging(system, getClass.getName)
//The Sink
//In reality this is of course a real Sink doing some useful things :-)
//The Sink accept types of "SomethingMySinkUnderstand"
val mySink = Sink.ignore;
def main(args: Array[String]): Unit = {
//our sender is not interested in getting replies from us
//so we just want to consume the tcp stream and never send back anything to the sender
val (address, port) = ("127.0.0.1", 6000)
server(system, address, port)
}
def server(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
println("Client connected from: " + conn.remoteAddress)
conn handleWith Flow[ByteString]
//this is neccessary since we use a self developed tcp wire protocol
.via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
//here we want to map the raw bytes into something our Sink understands
.map(msg => new SomethingMySinkUnderstand(msg.utf8String))
//here we like to connect our Sink to the Tcp Source
.to(mySink) //<------ NOT COMPILING
}
val tcpSource = Tcp().bind(address, port)
val binding = tcpSource.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
class SomethingMySinkUnderstand(x:String) {
}
}
Update: Add this to your build.sbt file to get necessary deps
libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
Solution 1:[1]
handleWith
expects a Flow
, i.e. a box with an unconnected inlet and an unconnected outlet. You effectively provide a Source
, because you connected the Flow
with a Sink
by using the to
operation.
I think you could do the following:
conn.handleWith(
Flow[ByteString]
.via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
.map(msg => new SomethingMySinkUnderstand(msg.utf8String))
.alsoTo(mySink)
.map(_ => ByteString.empty)
.filter(_ => false) // Prevents sending anything back
)
Solution 2:[2]
Alternative (and in my view cleaner) way to code it (AKKA 2.6.x), that will also emphasize of the fact, that you do not do any outbound flow, would be:
val receivingPipeline = Flow
.via(framing)
.via(decoder)
.to(mySink)
val sendingNothing = Source.never[ByteString]()
conn.handleWith(Flow.fromSinkAndSourceCoupled(receivingPiline, sendingNothing))
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Taig |
Solution 2 | bobah |