Fast Broadcast

Oct 13, 2011 at 3:53 PM

Does anyone know the best way to broadcast the same message to all active connections in parallel?

The example in QuickStart.BroadcastService:

  Async.Run(() =>
                {
                    sessions.ForEach(s => s.SendResponse(message));
                });
Would seem to me to send to the connections in serial, but on a thread independent to the BroadcastMessage() handler.

I tried an alternative:
public void Broadcast(string message)
        {
            var sessions = this.GetAllSessions();
 
            foreach (var s in sessions)
            {
                SuperSocket.Common.Async.Run(() =>
                    {
                        s.SendResponse(message);
                    });
            }
        }
Which sent the message twice to my first connection and zero times to my second connection.

Would ThreadPool.QueueUserWorkItem be a better approach?

Thanks,
Jason

SuperSocket.Common
Coordinator
Oct 13, 2011 at 3:59 PM

Which sent the message twice to my first connection and zero times to my second connection?

Can you confirm it again?

Zero times to my second connection is probable, because the sessions return by this.GetAllSessions() come from session snapshot, and the snapshot cannot be updated real time.

 

Actually,

Async.Run(...) is using ThreadPool.QueueUserWorkItem already!


 


Oct 13, 2011 at 4:07 PM

>>Can you confirm it again?

Confirmed!  That code sends the message to the first connection equal to the number of total active connections.  I've tested 1,2,3,4,5.  5 messages sent to my first telnet, zero sent to each of the rest of them.

 

Maybe I am not using Async.Run correctly?


Coordinator
Oct 13, 2011 at 4:16 PM

It might be caused by the threading issues when use foreach and async together.

You can try access sessions by array:

var sessions = server.GetAllSessions().ToArray();

for(var i= 0; i < sessions.Length; i++)

{

     var s = sessions[i];

     Async.Run(...);

}

Oct 13, 2011 at 5:00 PM
Edited Oct 13, 2011 at 5:00 PM

Yes, that worked, thanks.

 

It may be a good addition to Super Sockets to have a Session.SendResponseAsync() alternative.

SendResponse
Oct 13, 2011 at 5:03 PM

So what is the alternative to GetAllSessions()? if you saying it's a snapshot, then it can not be used safely to retrieve all open sessions?

Oct 13, 2011 at 5:11 PM

There is a config option to turn off the snapshot behavior. 

DisableSessionSnapshot

                if (Config.DisableSessionSnapshot)
                    return m_SessionDict.ToArray();
                else
                    return m_SessionsSnapshot;
            

Oct 13, 2011 at 6:34 PM
Edited Oct 13, 2011 at 11:53 PM

Thanx, I should have looked at the code myself, it's obvious.

Jan 3, 2012 at 10:15 AM
jmacentee wrote:
public void Broadcast(string message)
        {
            var sessions = this.GetAllSessions();
 
            foreach (var s in sessions)
            {
                SuperSocket.Common.Async.Run(() =>
                    {
                        s.SendResponse(message);
                    });
            }
        }
Which sent the message twice to my first connection and zero times to my second connection.
SuperSocket.Common

This behavior is sometimes called a "modified closure".

() => { s.SendResponse(message); } is an anonymous delegate, that will use the value s has at execution time (that is when the schedulled task is finally being executed).

Normaly, the foreach-Loop should by then have completed.

For a foreach loop the loop variable is defined outside the loop body, so technically the same "s" is reused for all iterations, so all the Tasks that are created point at the same variable.

Another way around this issue could be copying the value of s to a new variable scoped to the loop body like this:

            foreach (var s in sessions)
            {
var currentSession = s;
                 SuperSocket.Common.Async.Run(() =>                     {                        currentSession.SendResponse(message);                     });             }

This is similar to the solution using a for-loop. There the loop-variable is i, and for each iteration a new variable s is created using the corresponding value from the list. Thus every Task uses a different s with just the right value.

You can do some experiments if you like, maybe using a console program:

var actions = new List<Action>();

for (int i = 0; i < 10; i++)
{
  Action action = () => Console.WriteLine(i);
  actions.Add(action);
}

foreach (var action in actions)
{
  action.Invoke();
}

actions.Clear();

for (int i = 0; i < 10; i++)
{
  int current = i;
  Action action = () => Console.WriteLine(current);
  actions.Add(action);
}

foreach (var action in actions)
{
  action.Invoke();
}

Coordinator
Jan 3, 2012 at 2:39 PM

Yes, Khyalis

you are correct!

Jun 7, 2013 at 11:19 PM
Khyalis wrote:
s.SendResponse(message);
Since Khyalis posted this, SendResponse has been deprecated. Instead use Send(ArraySegment<byte> segment)

You could also take a cue from the souce. The following method is in SuperSocket.SocketBase.AppServer<TAppSession, TRequestInfo>
/// <summary>
/// Clears the idle session.
/// </summary>
/// <param name="state">The state.</param>
private void ClearIdleSession(object state)
{
    if (Monitor.TryEnter(state))
    {
        try
        {
            DateTime now = DateTime.Now;
            DateTime timeOut = now.AddSeconds(0 - Config.IdleSessionTimeOut);

            var timeOutSessions = SessionSource.Where(s => s.Value.LastActiveTime <= timeOut).Select(s => s.Value);
            System.Threading.Tasks.Parallel.ForEach(timeOutSessions, s =>
                {
                    if (Logger.IsInfoEnabled)
                        Logger.Info(s, string.Format("The session will be closed for {0} timeout, the session start time: {1}, last active time: {2}!", now.Subtract(s.LastActiveTime).TotalSeconds, s.StartTime, s.LastActiveTime));
                    s.Close(CloseReason.TimeOut);
                });
        }
        catch (Exception e)
        {
            if(Logger.IsErrorEnabled)
                Logger.Error("Clear idle session error!", e);
        }
        finally
        {
            Monitor.Exit(state);
        }
    }
}
It is used to close the sessions, but could just as easily be used to call Send on them.

Also, check out this excellent blog post for other options and explantions as to how the different options (including the above example) effect thread blocking, etc:
Make Async I/O Work For You