'How to prepare Data-Set for main thread in an additional thread and stay threadsafe?
Description: (Following is only a low level description)
Assume you have a List of automates, one of people and one of tasks. The corresponding Data comes from different Udp-packages. Each task has a different executor(person or machine) and a different object is assigned to (again person or machine). To prevent myself from always iterating through many tasks, in my Data Fusion thread I assign tasks directly to it´s Target (person or machine) and create connectors which contain the corresponding executor(again person or machine). To Fusion, I have a different Subsystems, One which creates a dictionary of Person, one of machines and one of tasks, after that, another subsystem assigns the tasks to the different Entities (person or machine) which are both of Type ITaskable. After That I for example Group different Persons to one GroupEntity in another Subsystem and so on.
In Main Thread I have a PersonManagement and a MachineManagement which creates the visible Objects of the different DataElements and moves them and so on. A Taskmanagement creates visualisation of all tasks of a selected ITaskable-Element. A GroupManagement Generates Group of different Entities and visualizes them and so on.
So In the end, while the main thread does it´s job with the current complete Set of Data (All Persons and Machines with their Tasks assigned as well as a TaskDict and GroupDict), I want the Fusion-Thread to fusion the next Received UDP-Packages, and exactly that´s my Problem.
Problem:
I don´t want to lock the Main Thread, if after a complete main thread cycle no new complete Set of data is available, so I have to go on with the old one and don´t wait for the new one!. Unfortunately, since The Main Thread works with the References of the Data Sets(PersonDict, TaskDict, MachineDict) the old and the new DataSet are basically the same since I only update specific entities or add new ones to the dictionaries in DataFusion. Since I don´t Lock the Data Fusion, it processes with those References while main Thread still has the old Reference. How can I get this Threadsafe, without Locking too much?
Solution 1:[1]
This may be what you are looking for: https://github.com/tcwicks/ChillX/tree/master/src/ChillX.Threading
Specifically:
or
Note: the two implementations are interchangeable.
I built this specifically for a socket server which runs microservices
Usage example below for your specific scenario:
public class ExampleBulkControllerForSocketDataProcessor
{
public struct MyWorkItem
{
public int Id;
public string Name;
public string Description;
}
public enum WorkItemPriority
{
Low = 0,
Medium = 1,
High = 2,
}
/// <summary>
/// Call this method with each incomming packet of data
/// </summary>
public void ScheduleDataPacketForProcessing(byte[] buffer, int clientID, WorkItemPriority priority)
{
ThreadedProcessorExample.ScheduleWorkItem(priority,buffer, clientID);
}
/// <summary>
/// Call this method to retrieve processed results
/// </summary>
/// <param name="success">True if processed work item was available. False if no processed work items are available</param>
/// <returns></returns>
public MyWorkItem GetProcessedData(out bool success)
{
BulkProcessor.ThreadWorkItem<byte[], MyWorkItem, int> workItem;
if (ThreadedProcessorExample.TryGetProcessedWorkItem(out workItem))
{
success = true;
return workItem.Response;
}
success = false;
return default;
}
//Use this
private static Threading.BulkProcessor.ThreadedWorkItemProcessor<byte[], MyWorkItem, int, WorkItemPriority> ThreadedProcessorExample = new Threading.BulkProcessor.ThreadedWorkItemProcessor<byte[], MyWorkItem, int, WorkItemPriority>(
_maxWorkItemLimitPerClient: 100 // Maximum number of concurrent requests in the processing queue per client. Set to int.MaxValue to disable concurrent request caps
, _maxWorkerThreads: 4 // Maximum number of threads to scale upto
, _threadStartupPerWorkItems: 4 // Consider starting a new processing thread ever X requests
, _threadStartupMinQueueSize: 4 // Do NOT start a new processing thread if work item queue is below this size
, _idleWorkerThreadExitSeconds: 10 // Idle threads will exit after X seconds
, _processedQueueMaxSize: 100 // Not yet implemented
, _processedItemAutoDispose: false //If true the work items will be discarded once processing is complete
, _processRequestMethod: ProcessRequestMethod // Your Do Work method for processing the request
, _logErrorMethod: Handler_LogError
, _logMessageMethod: Handler_LogMessage
);
//Or comment the above and uncomment this
//private static Threading.BulkProcessor.AsyncThreadedWorkItemProcessor<byte[], MyWorkItem, int, WorkItemPriority> ThreadedProcessorExample = new Threading.BulkProcessor.AsyncThreadedWorkItemProcessor<byte[], MyWorkItem, int, WorkItemPriority>(
// _maxWorkItemLimitPerClient: 100 // Maximum number of concurrent requests in the processing queue per client. Set to int.MaxValue to disable concurrent request caps
// , _threadStartupPerWorkItems: 4 // serves no purpose in this implementation. Provided only for interchangeability with ThreadedWorkItemProcessor
// , _threadStartupMinQueueSize: 4 // serves no purpose in this implementation. Provided only for interchangeability with ThreadedWorkItemProcessor
// , _idleWorkerThreadExitSeconds: 10 // serves no purpose in this implementation. Provided only for interchangeability with ThreadedWorkItemProcessor
// , _idleWorkerThreadExitSeconds: 10 // Idle threads will exit after X seconds
// , _processedQueueMaxSize: 100 // Not yet implemented
// , _processedItemAutoDispose: false //If true the work items will be discarded once processing is complete
// , _processRequestMethod: ProcessRequestMethod // Your Do Work method for processing the request
// , _logErrorMethod: Handler_LogError
// , _logMessageMethod: Handler_LogMessage
// );
private static MyWorkItem ProcessRequestMethod(byte[] request)
{
// Do whatever processing that is desired and create the response
return new MyWorkItem();
}
public static void Handler_LogError(Exception ex)
{
//Log unhandled exception here
}
private static void Handler_LogMessage(string _message, bool _isError, bool _isWarning)
{
//Log message here
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | tcwicks |