Using UDP
UDP is a connectionless datagram protocol which offers two different ways of communication on the JDK level:
- sockets which are free to send datagrams to any destination and receive datagrams from any origin
- sockets which are restricted to communication with one specific remote socket address
In the low-level API the distinction is made—confusingly—by whether or not
connect
has been called on the socket (even when connect has been
called the protocol is still connectionless). These two forms of UDP usage are
offered using distinct IO extensions described below.
Unconnected UDP
Simple Send
public static class SimpleSender extends UntypedActor {
final InetSocketAddress remote;
public SimpleSender(InetSocketAddress remote) {
this.remote = remote;
// request creation of a SimpleSender
final ActorRef mgr = Udp.get(getContext().system()).getManager();
mgr.tell(UdpMessage.simpleSender(), getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.SimpleSenderReady) {
getContext().become(ready(getSender()));
} else unhandled(msg);
}
private Procedure<Object> ready(final ActorRef send) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof String) {
final String str = (String) msg;
send.tell(UdpMessage.send(ByteString.fromString(str), remote), getSelf());
} else unhandled(msg);
}
};
}
}
The simplest form of UDP usage is to just send datagrams without the need of
getting a reply. To this end a “simple sender” facility is provided as
demonstrated above. The UDP extension is queried using the
simpleSender
message, which is answered by a SimpleSenderReady
notification. The sender of this message is the newly created sender actor
which from this point onward can be used to send datagrams to arbitrary
destinations; in this example it will just send any UTF-8 encoded
String
it receives to a predefined remote address.
注釈
The simple sender will not shut itself down because it cannot know when you
are done with it. You will need to send it a PoisonPill
when you
want to close the ephemeral port the sender is bound to.
Bind (and Send)
public static class Listener extends UntypedActor {
final ActorRef nextActor;
public Listener(ActorRef nextActor) {
this.nextActor = nextActor;
// request creation of a bound listen socket
final ActorRef mgr = Udp.get(getContext().system()).getManager();
mgr.tell(
UdpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0)),
getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.Bound) {
final Udp.Bound b = (Udp.Bound) msg;
getContext().become(ready(getSender()));
} else unhandled(msg);
}
private Procedure<Object> ready(final ActorRef socket) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof Udp.Received) {
final Udp.Received r = (Udp.Received) msg;
// echo server example: send back the data
socket.tell(UdpMessage.send(r.data(), r.sender()), getSelf());
// or do some processing and forward it on
final Object processed = // parse data etc., e.g. using PipelineStage
nextActor.tell(processed, getSelf());
} else if (msg.equals(UdpMessage.unbind())) {
socket.tell(msg, getSelf());
} else if (msg instanceof Udp.Unbound) {
getContext().stop(getSelf());
} else unhandled(msg);
}
};
}
}
If you want to implement a UDP server which listens on a socket for incoming
datagrams then you need to use the bind
command as shown above. The
local address specified may have a zero port in which case the operating system
will automatically choose a free port and assign it to the new socket. Which
port was actually bound can be found out by inspecting the Bound
message.
The sender of the Bound
message is the actor which manages the new
socket. Sending datagrams is achieved by using the send
message type
and the socket can be closed by sending a unbind
command, in which
case the socket actor will reply with a Unbound
notification.
Received datagrams are sent to the actor designated in the bind
message, whereas the Bound
message will be sent to the sender of the
bind
.
Connected UDP
The service provided by the connection based UDP API is similar to the
bind-and-send service we saw earlier, but the main difference is that a
connection is only able to send to the remoteAddress
it was connected to,
and will receive datagrams only from that address.
public static class Connected extends UntypedActor {
final InetSocketAddress remote;
public Connected(InetSocketAddress remote) {
this.remote = remote;
// create a restricted a.k.a. “connected” socket
final ActorRef mgr = UdpConnected.get(getContext().system()).getManager();
mgr.tell(UdpConnectedMessage.connect(getSelf(), remote), getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg instanceof UdpConnected.Connected) {
getContext().become(ready(getSender()));
} else unhandled(msg);
}
private Procedure<Object> ready(final ActorRef connection) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof UdpConnected.Received) {
final UdpConnected.Received r = (UdpConnected.Received) msg;
// process data, send it on, etc.
// #connected
if (r.data().utf8String().equals("hello")) {
connection.tell(
UdpConnectedMessage.send(ByteString.fromString("world")),
getSelf());
}
// #connected
} else if (msg instanceof String) {
final String str = (String) msg;
connection
.tell(UdpConnectedMessage.send(ByteString.fromString(str)),
getSelf());
} else if (msg.equals(UdpConnectedMessage.disconnect())) {
connection.tell(msg, getSelf());
} else if (msg instanceof UdpConnected.Disconnected) {
getContext().stop(getSelf());
} else unhandled(msg);
}
};
}
}
Consequently the example shown here looks quite similar to the previous one,
the biggest difference is the absence of remote address information in
send
and Received
messages.
注釈
There is a small performance benefit in using connection based UDP API over the connectionless one. If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security check, while in the case of connection-based UDP the security check is cached after connect, thus writes do not suffer an additional performance penalty.
UDP Multicast
If you want to use UDP multicast you will need to use Java 7. Akka provides
a way to control various options of DatagramChannel
through the
akka.io.Inet.SocketOption
interface. The example below shows
how to setup a receiver of multicast messages using IPv6 protocol.
To select a Protocol Family you must extend akka.io.Inet.DatagramChannelCreator
class which implements akka.io.Inet.SocketOption
. Provide custom logic
for opening a datagram channel by overriding create
method.
public static class Inet6ProtocolFamily extends Inet.DatagramChannelCreator {
@Override
public DatagramChannel create() throws Exception {
return DatagramChannel.open(StandardProtocolFamily.INET6);
}
}
Another socket option will be needed to join a multicast group.
public static class MulticastGroup extends Inet.AbstractSocketOptionV2 {
private String address;
private String interf;
public MulticastGroup(String address, String interf) {
this.address = address;
this.interf = interf;
}
@Override
public void afterBind(DatagramSocket s) {
try {
InetAddress group = InetAddress.getByName(address);
NetworkInterface networkInterface = NetworkInterface.getByName(interf);
s.getChannel().join(group, networkInterface);
} catch (Exception ex) {
System.out.println("Unable to join multicast group.");
}
}
}
Socket options must be provided to UdpMessage.bind
command.
List<Inet.SocketOption> options = new ArrayList<>();
options.add(new Inet6ProtocolFamily());
options.add(new MulticastGroup(group, iface));
final ActorRef mgr = Udp.get(getContext().system()).getManager();
// listen for datagrams on this address
InetSocketAddress endpoint = new InetSocketAddress(port);
mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf());
Contents