Многопоточность 4-уровневая смешанная модель производителя-потребителя — потоки не чередуются должным образом

Вот проблема, которую я пытаюсь смоделировать: у меня есть входящий колл-центр (без исходящих звонков). Есть 3 уровня сотрудников: первокурсники, технический руководитель, руководитель проекта. В нашем колл-центре всего один TL, один PM и несколько Freshers. Вызовы, которые не могут обработать первокурсники, передаются TL, а вызовы, которые не может обработать TL, передаются PM.

Мне нужно использовать подход OO C#.

Вот моя текущая попытка: я использовал C # ConcurrentQueue, так как думал, что он позаботится о «блокировках». Я хочу FIFO для колл-центра. Я сделал очередь для каждого уровня.

У меня есть производитель (звонящие), добавляющий вызовы в первую очередь. Смешанные новички продукта/потребителя проверяют вызов (принимают или передают в следующую очередь). Смешанный производственный/потребительский tl, затем чисто потребительский менеджер проекта.

Результат не тот, что я ожидаю. Запускается только первая свеженькая а менеджер проектов вообще не запускается. Я ожидаю намного больше чередования по мере добавления звонков в очередь.

Мой код ниже. У кого-нибудь есть лучший подход к этой проблеме или что-то не так с моим кодом?

В классе CallCenter происходит большая часть действий.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;
namespace CallCenterThreaded
{
/// <summary>
/// 
/// MIXED CONSUMER/PRODUCER TYPE
/// </summary>
class Fresher
{
    // add a class variable for the number of Employee objects instantiated
    private static int fresherNum = 0;
    private int fresherId;
    private ConcurrentQueue<Call> incommingCalls;
    private ConcurrentQueue<Call> outgoingCalls;

    public Fresher(ConcurrentQueue<Call> calls, ConcurrentQueue<Call> outcalls)
    {
        fresherId = ++fresherNum;
        incommingCalls = calls;
        outgoingCalls = outcalls;
        //start the producer thread
        Thread thread = new Thread(new ThreadStart(HandleCalls));
        thread.Start();
    }

    public int ID
    {
        get { return fresherId; }
    }

    /// <summary>
    /// 
    /// </summary>
    public void HandleCalls() 
    {

        while (incommingCalls.Count > 0)
        {

            Call call;
            incommingCalls.TryDequeue(out call);

            if (call != null)
            {
                if (call.EscalationLevel == 0)
                {
                    TakeCalls(call);
                }
                else 
                {
                    TransferCalls(call);
                }
            }

        }

    }

    /// <summary>
    /// Transfer to the TL
    /// </summary>
    public void TransferCalls(Call call)
    {
        outgoingCalls.Enqueue(call);
        Console.WriteLine(".......Fresher {0} escalated call {1}", ID, call.CallID);
    }

    /// <summary>
    /// Consume the calls
    /// </summary>
    public void TakeCalls(Call call)
    {
        Console.WriteLine("Fresher {0} handled call {1}", ID, call.CallID);
    }



}
}
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Collections.Concurrent;
   namespace CallCenterThreaded
    {
    /// <summary>
    /// MIXED CONSUMER/PRODUCER TYPE
    /// </summary>
    class TechnicalLead
    {

    // add a class variable for the number of Employee objects instantiated
    private static int tlNum = 0;
    private int tlId;

    private ConcurrentQueue<Call> incommingCalls;
    private ConcurrentQueue<Call> outgoingCalls;

    public TechnicalLead(ConcurrentQueue<Call> calls, ConcurrentQueue<Call> outcalls)
    {
        tlId = ++tlNum;
        incommingCalls = calls;
        outgoingCalls = outcalls;
        //start the producer thread
        Thread thread = new Thread(new ThreadStart(HandleCalls));
        thread.Start();
    }

    public int ID
    {
        get { return tlId; }
    }

    /// <summary>
    /// 
    /// </summary>
    public void HandleCalls() 
    {

        while (incommingCalls.Count > 0)
        {

            Call call;
            incommingCalls.TryDequeue(out call);

            if (call != null)
            {
                if (call.EscalationLevel == 1)
                {
                    TakeCalls(call);
                }
                else 
                {
                    TransferCalls(call);
                }
            }

        }

    }
    /// <summary>
    /// Transfer to the PM
    /// </summary>
    public void TransferCalls(Call call)
    {
        //Console.WriteLine(".......Technical Lead {0} escalated call {1}", ID, call.CallID);
        outgoingCalls.Enqueue(call);           
    }

    /// <summary>
    /// Consume the calls
    /// </summary>
    public void TakeCalls(Call call)
    {
        Console.WriteLine("Technical Lead {0} handled call {1}", ID, call.CallID);
    }
}
}
    using System;
   using System.Collections.Generic;
 using System.Linq;
 using System.Text;
   namespace CallCenterThreaded
   {
class Call
{
    private static int numberCalls = 0;
    private int callno;

    private int esclataion;


    public Call() 
    {
        callno = ++numberCalls;
        esclataion = 0;
        if(callno % 3 == 0)
        {
            esclataion = 1;
        }
        if(callno % 5 == 0)
        {
            esclataion = 2;
        }

    }

    public int CallID { get { return callno; } }
    public int EscalationLevel { get { return esclataion; } }
}
}
 using System;
 using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;
namespace CallCenterThreaded
{
/// <summary>
/// 
/// </summary>
class CallCenter
{

    private ConcurrentQueue<Call> fresherCalls;
    private ConcurrentQueue<Call> tlCalls;
    private ConcurrentQueue<Call> pmCalls;

    private List<Caller> myCallers;
    private List<Fresher> myFreshers;

    private TechnicalLead tl;
    private ProjectManager pm;

    public CallCenter() 
    {
        //initial incomming calls to the fresher queue
        fresherCalls = new ConcurrentQueue<Call>();
        tlCalls = new ConcurrentQueue<Call>();
        pmCalls = new ConcurrentQueue<Call>();

        myFreshers = new List<Fresher>();
        myCallers = new List<Caller>();

        generate_callers();
        generate_freshers();

        tl = new TechnicalLead(tlCalls, pmCalls);
        pm = new ProjectManager(pmCalls);
    }

    /// <summary>
    /// 
    /// </summary>
    private void generate_freshers() 
    {
        for (int i = 0; i < 5; i++ )
        {
            myFreshers.Add(new Fresher(fresherCalls, tlCalls));
        }
    }

    /// <summary>
    /// 
    /// </summary>
    private void generate_callers() 
    {
        for (int i = 0; i < 5; i++ )
        {
            myCallers.Add(new Caller(fresherCalls));
        }
    }

   }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;
namespace CallCenterThreaded
{
/// <summary>
/// PURE CONSUMER
/// </summary>
class ProjectManager
{

     // add a class variable for the number of Employee objects instantiated
    private static int pmNum = 0;
    private int pmId;

    private ConcurrentQueue<Call> incommingCalls;

    public ProjectManager(ConcurrentQueue<Call> calls)
    {
        pmId = ++pmNum;
        incommingCalls = calls;

        //start the producer thread
        Thread thread = new Thread(new ThreadStart(HandleCalls));
        thread.Start();
    }

    public int ID
    {
        get { return pmId; }
    }

    /// <summary>
    /// 
    /// </summary>
    public void HandleCalls() 
    {

        while (incommingCalls.Count > 0)
        {

            Call call;
            incommingCalls.TryDequeue(out call);

            if (call != null && call.EscalationLevel == 2)
            {
                TakeCalls(call); 
            }

        }

    }
    /// <summary>
    /// Consume the calls
    /// </summary>
    public void TakeCalls(Call call)
    {
        Console.WriteLine("Project Manager {0} handled call {1}", ID, call.CallID);
    }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;
namespace CallCenterThreaded
{
class MainClass
{
    static void Main(string[] args)
    {

        CallCenter myCenter = new CallCenter();

        Console.ReadLine();
    }
}
}
    using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
namespace CallCenterThreaded
{
/// <summary>
/// This is a producer. It produces calls and adds them to the queue.
/// PURE PRODUCER TYPE
/// </summary>
class Caller
{

    private ConcurrentQueue<Call> incommingCalls;

    public Caller(ConcurrentQueue<Call> calls)
    {
        incommingCalls = calls;
        //start the producer thread
        Thread thread = new Thread(new ThreadStart(placeCalls));
        thread.Start();
    }

    public void placeCalls()
    {

        //place 10 calls
        for (int callno = 0; callno < 4; callno++)
        {
            //Console.WriteLine("Call {0} was added to the queue", callno);
            incommingCalls.Enqueue(new Call());
        }

    }

}
}

person user1261710    schedule 27.01.2013    source источник
comment
Это много кода. Вы можете изучить BlockingCollection, а не ConcurrentQueue, так как он выполняет большую часть работы за вас. BlockingCollection — это очередь FIFO внизу (в ней используется ConcurrentQueue).   -  person Jim Mischel    schedule 28.01.2013


Ответы (1)


Проблема в методах HandleCalls():

public void HandleCalls() 
{
    while (incommingCalls.Count > 0)
    {
        …
    }
}

Что делает этот код, так это проверяет incomingCalls и, если их нет, он немедленно выходит из потока. Как будто работник пришел на работу, посмотрел свою очередь звонков, ничего там не нашел (потому что пришел на работу одновременно с фрешами, которые должны переадресовывать звонки) и вот ушел и пошел домой.

Что вам нужно сделать, так это подождать, если в данный момент нет работы. Вероятно, лучший способ сделать это — использовать BlockingCollection вместо ConcurrentQueue.

Кроме того, ваш дизайн не кажется таким уж хорошим. Например, много повторяющегося кода.

person svick    schedule 28.01.2013
comment
Спасибо, вы знаете Java-эквивалент BlockingCollection? - person user1261710; 28.01.2013
comment
Я переключил все на BlockingCollection. Сначала звонки принимал только Fresher 1. Только если я генерирую обновления перед любыми вызовами, вывод выглядит немного более реалистичным. Это потому, что они находятся в состоянии ожидания, так как в очереди нет вызовов. Есть ли способ заставить их ждать после создания экземпляра. И да... Мне действительно нужен абстрактный класс, чтобы очистить код... - person user1261710; 28.01.2013
comment
Мне не нравится foreach (строковое значение в blockingCollection.GetConsumingEnumerable()) { Console.WriteLine(Worker 1: + value); } Изменяет порядок элементов в блокирующей коллекции. Есть ли альтернатива? - person user1261710; 28.01.2013
comment
Кажется, у меня есть условия гонки с BlockingCollection - person user1261710; 28.01.2013
comment
@user1261710 user1261710 Тогда, я думаю, вам следует задать новый вопрос. В идеале включайте только соответствующий код. - person svick; 28.01.2013