c# - Network Command Processing with TPL Dataflow -
i'm working on system involves accepting commands on tcp network connection, sending responses upon execution of commands. basic stuff, i'm looking support few requirements:
- multiple clients can connect @ same time , establish separate sessions. sessions can last long or short desired, same client ip able establish multiple parallel sessions, if desired.
- each session can process multiple commands @ same time, of requested operations can performed in parallel.
i'd implement cleanly using async/await and, based on i've read, tpl dataflow sounds way cleanly break processing nice chunks can run on thread pool instead of tying threads different sessions/commands, blocking on wait handles.
this i'm starting (some parts stripped out simplify, such details of exception handling; i've omitted wrapper provides efficient awaitable network i/o):
private readonly task _servicetask; private readonly task _commandstask; private readonly cancellationtokensource _cancellation; private readonly bufferblock<command> _pendingcommands; public networkservice(icommandprocessor commandprocessor) { _commandprocessor = commandprocessor; isrunning = true; _cancellation = new cancellationtokensource(); _pendingcommands = new bufferblock<command>(); _servicetask = task.run((func<task>)runservice); _commandstask = task.run((func<task>)runcommands); } public bool isrunning { get; private set; } private async task runservice() { _listener = new tcplistener(ipaddress.any, serviceport); _listener.start(); while (isrunning) { socket client = null; try { client = await _listener.acceptsocketasync(); client.blocking = false; var session = runsession(client); lock (_sessions) { _sessions.add(session); } } catch (exception ex) { //handling here... } } } private async task runcommands() { while (isrunning) { var command = await _pendingcommands.receiveasync(_cancellation.token); var task = task.run(() => runcommand(command)); } } private async task runcommand(command command) { try { var response = await _commandprocessor.runcommand(command.content); send(command.client, response); } catch (exception ex) { //deal general command exceptions here... } } private async task runsession(socket client) { while (client.connected) { var reader = new delimitedcommandreader(client); try { var content = await reader.receivecommand(); _pendingcommands.post(new command(client, content)); } catch (exception ex) { //exception handling here... } } }
the basics seem straightforward, 1 part tripping me up: how make sure when i'm shutting down application, wait pending command tasks complete? task object when use task.run execute command, how keep track of pending commands can make sure of them complete before allowing service shut down?
i've considered using simple list, removal of commands list finish, i'm wondering if i'm missing basic tools in tpl dataflow allow me accomplish more cleanly.
edit:
reading more tpl dataflow, i'm wondering if should using transformblock increased maxdegreeofparallelism allow processing parallel commands? sets upper limit on number of commands can run in parallel, that's sensible limitation system, think. i'm curious hear have experience tpl dataflow know if i'm on right track.
yeah, so... you're kinda half using power of tpl here. fact you're still manually receiving items bufferblock
in own while loop in background task
not "way" want if you're subscribing tpl dataflow style.
what link actionblock
bufferblock
, command processing/sending within that. block set maxdegreeofparallelism
control how many concurrent commands want process. setup might this:
// initialization logic build tpl flow _pendingcommands = new bufferblock<command>(); _commandprocessor = new actionblock<command>(this.processcommand); _pendingcommands.linkto(_commandprocessor); private task processcommand(command command) { var response = await _commandprocessor.runcommand(command.content); this.send(command.client, response); }
then, in shutdown code, need signal you're done adding items pipeline calling complete
on _pipelinecommands
bufferblock
, wait on _commandprocessor
actionblock
complete ensure items have made way through pipeline. grabbing task
returned block's completion
property , calling wait
on it:
_pendingcommands.complete(); _commandprocessor.completion.wait();
if want go bonus points, can separate command processing command sending. allow configure steps separately 1 another. example, maybe need limit number of threads processing commands, want have more sending out responses. introducing transformblock
middle of flow:
_pendingcommands = new bufferblock<command>(); _commandprocessor = new transformblock<command, tuple<client, response>>(this.processcommand); _commandsender = new actionblock<tuple<client, response>(this.sendresponsetoclient)); _pendingcommands.linkto(_commandprocessor); _commandprocessor.linkto(_commandsender); private task processcommand(command command) { var response = await _commandprocessor.runcommand(command.content); return tuple.create(command, response); } private task sendresponsetoclient(tuple<client, response> clientandresponse) { this.send(clientandresponse.item1, clientandresponse.item2); }
you want use own data structure instead of tuple
, illustrative purposes, point kind of structure want use break pipeline can control various aspects of how might need to.
Comments
Post a Comment