C# 24 Study Notes Concurrency by Asynchronous
24.Concurrency by async
📌Asynchronicity and scalability
24.1.Implement Asynchronous method
📌What is Asynchronous method?
An asynchronous method is one that does not block the current thread on which it starts to run.
📌What happened after invoking an asynchronous method?
Once invokes, the method will return control to the calling environment and to perform its work on a separate thread.
📌What should you use?
await
and async
24.1.1. Problem📃 & Solution🔨
The Problem
Suppose you have a method called slowMethod
which is invoked by a UI event, e.g. left mouse click. Meanwhile, the methods have to do it one after another.
xxxxxxxxxx
private void slowMethod()
{
doFirstLongRunningOperation();
doSecondLongRunningOperation();
doThirdLongRunningOperation();
message.Text = "Processing Completed";
}
private void doFirstLongRunningOperation()
{
...
}
private void doSecondLongRunningOperation()
{
...
}
private void doThirdLongRunningOperation()
{
...
}
The preceding problem is that the UI thread(main thread) will be frozen until the 3rd method completed.
1️⃣Implement method with
Task
, ❌
xxxxxxxxxx
private void slowMethod()
{
Task task = new Task(doFirstLongRunningOperation);
task.ContinueWith(doSecondLongRunningOperation);
task.ContinueWith(doThirdLongRunningOperation);
task.Start();
message.Text = "Processing Completed"; // this method executes right after task.Start()
}
private void doFirstLongRunningOperation()
{
...
}
private void doSecondLongRunningOperation(Task t)
{
...
}
private void doThirdLongRunningOperation(Task t)
{
...
}
The preceding problem is that the message.Text
will not wait for the task
end😢. It pops up the message right after the task.Start();
.
2️⃣Implement with
Task
andWait
, ❌
xxxxxxxxxx
private void slowMethod()
{
Task task = new Task(doFirstLongRunningOperation);
task.ContinueWith(doSecondLongRunningOperation);
task.ContinueWith(doThirdLongRunningOperation);
task.Start();
task.Wait(); //Block again!!
message.Text = "Processing Completed";
}
The preceding problem is that the thread still waits for the task.Wait()
which is meaningless. The UI thread will block the interface again.
3️⃣Implement with
Task
and define continuation❌
xxxxxxxxxx
private void slowMethod()
{
Task task = new Task(doFirstLongRunningOperation);
task.ContinueWith(doSecondLongRunningOperation);
task.ContinueWith(doThirdLongRunningOperation);
task.ContinueWith((t) => message.Text = "Processing Complete"); //execute in another thread
task.Start();
}
The preceding problem is "The application called an interface that was marshaled for a different thread".
What does it mean?🤔
In C#, only the main thread(UI thread) has the right to modify UI. Other threads don't have the right.
4️⃣Implement with
Task
, define continuation, and useDispatcher
. 🆗😶
xxxxxxxxxx
private void slowMethod()
{
Task task = new Task(doFirstLongRunningOperation);
task.ContinueWith(doSecondLongRunningOperation);
task.ContinueWith(doThirdLongRunningOperation);
task.ContinueWith((t) => this.Dispatcher.RunAsync(
CoreDispatcherPriority.Normal,
() => message.Text = "Processing Complete"));
task.Start();
}
The Dispatcher
object is a component of the user interface infrastructure, and you can send it requests to perform work on the user interface thread by calling its RunAsync
method. Although this works, but it is messy and hard to maintain the code.
The Solution
The keywords async
and await
are to tackle such problem and you don't have to concern to use Dispatcher
.
There are few things worth discussed.
📌what should be the operand of
await
?
The thing right next to await
is called operand, e.g. doFirstLongRunningOperation
is the operand of await
. The operand must be a Task
. A.k.a. The return type of the method is Task
.
📌what is the mechanism behind?
It is very similar to using Dispatcher
.
async
:
does - ✔️ specify that the code in the method can be divided into one or more continuations.
does not - ❌ signify that a method runs asynchronously on a separate thread
await
:
does - ✔️ specifies when the C# compiler can split the code into a continuation. The right hand side of await
is an awaitable
object is a type that provides the GetAwaiter
method.
🔨Solution 1
xxxxxxxxxx
private async void slowMethod()
{
await doFirstLongRunningOperation();
await doSecondLongRunningOperation();
await doThirdLongRunningOperation();
message.Text = "Processing Complete";
}
//the method returns a Task!
private Task doFirstLongRunningOperation()
{
Task t = Task.Run(() => { /* original code goes here */ });
return t;
}
private Task doSecondLongRunningOperation()
{
Task t = Task.Run(() => { /* original code goes here */ });
return t;
}
private Task doThirdLongRunningOperation()
{
Task t = Task.Run(() => { /* original code goes here */ });
return t;
}
🔨Solution 2
The preceding solution has 1 constraint. What if I want to split one long running operation into few parallelable task?
xxxxxxxxxx
private Task doFirstLongRunningOperation()
{
Task task1 = Task.Run(() => { /* Task 1 code goes here */ });
Task task2 = Task.Run(() => { /* Task 2 code goes here */ });
return ...; //which task should I return?
}
The solution is to make the method async
as well.
xxxxxxxxxx
private async Task doFirstLongRunningOperation()
{
Task task1 = Task.Run(() => { /* Task 1 code goes here */ });
Task task2 = Task.Run(() => { /* Task 2 code goes here */ });
await task1;
await task2;
}
In the main code should look like this:
xxxxxxxxxx
private async void slowMethod()
{
await doFirstLongRunningOperation();
await doSecondLongRunningOperation();
await doThirdLongRunningOperation();
message.Text = "Processing Complete";
}
//the method returns a Task!
private async Task doFirstLongRunningOperation()
{
Task task1 = Task.Run(() => { /* Task 1 code goes here */ });
Task task2 = Task.Run(() => { /* Task 2 code goes here */ });
await task1;
await task2;
}
private async Task doSecondLongRunningOperation()
{
Task task1 = Task.Run(() => { /* Task 1 code goes here */ });
Task task2 = Task.Run(() => { /* Task 2 code goes here */ });
await task1;
await task2;
}
private async Task doThirdLongRunningOperation()
{
Task task1 = Task.Run(() => { /* Task 1 code goes here */ });
Task task2 = Task.Run(() => { /* Task 2 code goes here */ });
await task1;
await task2;
}
24.1.2. async
methods return values
This refers to Task<TResult>.Result
Suppose you have a Task:
xxxxxxxxxx
Task<int> calculateValueTask = Task.Run(() => calculateValue(...));
private int calculateValue(...)
{
int someValue;
// Perform calculation and populate someValue
...
return someValue;
}
There are 2 ways you can run this method and get its value:
Solution 1🔨 Will block until the task complete❌
xxxxxxxxxx
int calculateData = calculateValueTask.Result;
Solution 2🔨 ✔️
xxxxxxxxxx
int calculateData = await calculateValueTask;
📌What is the difference?
Using Result
would block until the task had completed.
Using await
does the opposite - it unwraps a Task<T>
to a T
value. It won't block the thread!
24.1.3. async
method gotchas⚠️⭐️
1️⃣ async
does NOT 100% means method runs asynchronously ❌
2️⃣ async
does mean the method contains statements that may run asynchronously.
3️⃣ await
indicates a method should be run by a separate task. The calling code is suspended until the method call completes.
4️⃣ await
is NOT Wait
!⚠️ The former would not block, the latter would.
5️⃣ By default, the code that resumes execution after an await
operator attempts to obtain the original thread.
Use ConfigureAwait(false)
to specify code can be resumed on any available thread.
ConfigureAwait(true)
is default.
The following is a bad example❌:
xxxxxxxxxx
//This method
private async void slowMethod()
{
await doFirstLongRunningOperation().ConfigureAwait(false); //call back at any other thread
await doSecondLongRunningOperation().ConfigureAwait(false); //call back at any other thread
await doThirdLongRunningOperation().ConfigureAwait(false); //call back at any other thread
//this step must run in Main Thread(UI Thread)
message.Text = "Processing Complete";
}
6️⃣Careless use of asynchronous methods!!⚠️
xxxxxxxxxx
//suppose you have an async task
private async Task<string> generateResult()
{
string result;
...
result = ...
return result;
}
❌Wrong:
xxxxxxxxxx
//suppose you have a method related to UI operation
private async void myMethod()
{
var data = generateResult(); //you didn't await here
...
message.Text = $"result: {data.Result}"; //this will block the thread
}
🆗
xxxxxxxxxx
//suppose you have a method related to UI operation
private async void myMethod()
{
var data = generateResult(); //you didn't await here
...
message.Text = $"result: {await data}"; //this will block the thread
}
✔️my preference
xxxxxxxxxx
//suppose you have a method related to UI operation
private async void myMethod()
{
var data = await generateResult(); //you didn't await here
...
message.Text = $"result: {data}"; //this will block the thread
}
24.1.4. async
methods and the WinRT
APIs
The designer of Windows 8 and later versions wanted to ensure the application as responsive as possible. So they decided any operations might take over 50ms have to implement they async
API.
There are several methods can be called asynchronously.
📌Display Message
Displays the message and waits for the user to click the Close button.
xxxxxxxxxx
using Windows.UI.Popups;
...
MessageDialog dlg = new MessageDialog("Message to user");
await dlg.ShowAsync(); //wait for user to close
📌Select File
Display the files in the user’s Documents folder and wait while the user selects a single file from this list
xxxxxxxxxx
using Windows.Storage;
using Windows.Storage.Pickers;
...
FileOpenPicker fp = new FileOpenPicker();
fp.SuggestedStartLocation = PickerLocationId.DocumentsLibrary;
fp.ViewMode = PickerViewMode.List;
fp.FileTypeFilter.Add("*");
StorageFile file = await fp.PickSingleFileAsync(); //wait for user browsing
📌Open a File
Open a file in an asynchronous way:
xxxxxxxxxx
var fileStream = await file.OpenAsync(FileAccessMode.Read);
📌Render Pixels on Screen
The pixels can be seen as stream.
xxxxxxxxxx
Stream pixelStream = graphBitmap.PixelBuffer.AsStream();
pixelStream.Seek(0, SeekOrigin.Begin);
pixelStream.Write(data, 0, data.Length);
...
await pixelStream.WriteAsync(data, 0, data.Length);
...
24.1.5. Memory allocation with ValueTask
📌Some Resources on ValueTask
.NET Blog Understanding the Whys, Whats, and Whens of ValueTask
📌Case Study
Please have a look on the following method:
xxxxxxxxxx
public async Task<int> FindValueAsync(string key)
{
//1. attempt to find it locally
bool foundLocally = GetCachedValue(key, out int result);
if (foundLocally)
return result;
//2. if not, then try to do a long operation searching
result = await RetrieveValue(key); // possibly takes a long time
//3. add it to local Cache for next time
AddItemToLocalCache(key, result);
return result;
}
💡Pattern:
Cache-Aside
The preceding method uses Cache-Aside
pattern which load data on demand into a cache from a data store. This can improve performance and also helps to maintain consistency between data held in the cache and data in the underlying data store.
📈Analysis on this Method
In most cases, the work will be performed synchronously (it finds the data in cache). The data is an integer, but it is returned wrapped in a Task<int>
object. Compared to directly return an int
, the former requires much more time and memory allocation.
Return Type | Operation | Memory |
---|---|---|
Task<int> | 1️⃣Create obj 2️⃣Populate obj 3️⃣Retrieve the data | On Steap |
int | return directly | On Stack |
🔨Solution
Use ValueTask
which marshals the return value as a value type on stack rather than reference type on heap.
xxxxxxxxxx
//change the return type to ValueTask<T>
public async ValueTask<int> FindValueAsync(string key)
{
bool foundLocally = GetCachedValue(key, out int result);
if (foundLocally)
return result;
result = await RetrieveValue(key); // possibly takes a long time
AddItemToLocalCache(key, result);
return result;
}
📌Conclusion
Return ValueTask
only if the vast majority of the calls to an async
method are likely to be performed synchronously. a.k.a. Most of the time, the call will return before the await
operator. Otherwise, too much async
operation inside a ValueTask
can decrease the efficiency.
24.2. PLINQ to parallelize declarative data access⭐️⭐️
Use.AsParallel()
! The following are examples to perform PLINQ.
24.2.1. Learn PLINQ by example
📌Example 1
The first example is to filter numbers which are over 100.
Suppose you have an array called
numbers
xxxxxxxxxx
int[] numbers = new int[NUM];
Random random = new Random(999);
for (int i = 0; i < NUM; i++)
{
numbers[i] = random.Next(200);
}
You have a pseudo test method
In reality, the query methods always take time. Therefore, here we used Thread.SpinWait()
to execute "no operation" instruction for a period of time.
xxxxxxxxxx
public static bool TestIfTrue(bool expr)
{
Thread.SpinWait(1000);
return expr;
}
Normal LINQ - old school
xxxxxxxxxx
//Create a LINQ query
var over100Query = from num in numbers
where TestIfTrue(num > 100)
select num;
//The query actually runs here(time consuming)
List<int> over100 = new List<int>(over100Query);
Normal LINQ - new school
xxxxxxxxxx
//Declare and run in one sentence
List<int> over100 = numbers.Where(num => TestIfTrue(num > 100))
.Select(num => num)
.ToList();
PLINQ - old school😄
xxxxxxxxxx
//Create a LINQ query with Parallel!!
var over100Query = from num in numbers.AsParallel()
where TestIfTrue(num > 100)
select num;
//The query actually runs here(parallel!!)
List<int> over100 = new List<int>(over100Query);
PLINQ - new school😄
xxxxxxxxxx
List<int> over100 = numbers.AsParallel()
.Where(num => TestIfTrue(num > 100))
.Select(num => num)
.ToList();
📌Example 2
The second example is to create customer order info with 2 different sources, 1️⃣customers and 2️⃣orders.
Customers
A piece of customer info can be split by ,
into 6 parts which contain:
- Customer ID
- Customer's company
- Address
- City
- Country or region
- Postal code.
xxxxxxxxxx
//A pseudo in memory data representing customers info
public class CustomersInMemory
{
public static string[] Customers =
{
"ALFKI,Alfreds Futterkiste,Obere Str. 57,Berlin,Germany,12209",
"ANTON,Antonio Moreno Taquería,Mataderos 2312,México D.F.,Mexico,05023",
"BERGS,Berglunds snabbköp,Berguvsvägen 8,Luleå,Sweden,S-958 22",
"BLAUS,Blauer See Delikatessen,Forsterstr. 57,Mannheim,Germany,68306",
...
"WHITC,White Clover Markets,305 - 14th Ave. S. Suite 3B,Seattle,USA,98128",
"WILMK,Wilman Kala,Keskuskatu 45,Helsinki,Finland,21240",
"WOLZA,Wolski Zajazd,ul. Filtrowa 68,Warszawa,Poland,01-012"
};
}
Order
A piece of order info can be split by ,
into 2 parts which contain:
- Order ID
- Customer ID
- Date of the order
xxxxxxxxxx
//A pseudo in memory data representing order info
public class OrdersInMemory
{
public static string[] Orders =
{
"10248,VINET,Jul 4 1996 12:00AM",
"10249,TOMSP,Jul 5 1996 12:00AM",
"10250,HANAR,Jul 8 1996 12:00AM",
"11074,SIMOB,May 6 1998 12:00AM",
...
"11075,RICSU,May 6 1998 12:00AM",
"11076,BONAP,May 6 1998 12:00AM",
"11077,RATTC,May 6 1998 12:00AM"
}
}
Customer Order Info
Now, we need to create a Customer-Order Info by pairing the customer info and order info with their related key.
xxxxxxxxxx
//The new data structure looks something like this
public class CustomerOrderInfo
{
public string CustomerID { get; set; }
public string CompanyName { get; set; }
public int OrderID { get; set; }
public DateTime OrderDate { get; set; }
}
LINQ - Old School
xxxxxxxxxx
var customerOrderInfoQuery = from c in CustomersInMemory.Customers
join o in OrdersInMemory.Orders
on c.Split(',')[0] equals o.Split(',')[1]
select new CustomerOrderInfo
{
CustomerID = c.Split(',')[0],
CompanyName = c.Split(',')[1],
OrderID = Convert.ToInt32(o.Split(',')[0]),
OrderDate = Convert.ToDateTime(o.Split(',')[2],
new CultureInfo("en-US"))
};
List<CustomerOrderInfo> customerOrderInfo = new List<CustomerOrderInfo>(customerOrderInfoQuery);
LINQ - New School
xxxxxxxxxx
//declare and create the query in one sentence
var customerOrderInfo = CustomersInMemory.Customers.Join(
OrdersInMemory.Orders,
c => c.Split(',')[0],
o => o.Split(',')[1],
(c, o) => new CustomerOrderInfo
{
CustomerID = c.Split(',')[0],
CompanyName = c.Split(',')[1],
OrderID = Convert.ToInt32(o.Split(',')[0]),
OrderDate = Convert.ToDateTime(o.Split(',')[2],
new CultureInfo("en-US"))
}
).ToList();
PLINQ - old school😄
xxxxxxxxxx
var customerOrderInfoQuery = from c in CustomersInMemory.Customers.AsParallel()
join o in OrdersInMemory.Orders.AsParallel()
on c.Split(',')[0] equals o.Split(',')[1]
select new CustomerOrderInfo
{
CustomerID = c.Split(',')[0],
CompanyName = c.Split(',')[1],
OrderID = Convert.ToInt32(o.Split(',')[0]),
OrderDate = Convert.ToDateTime(o.Split(',')[2],
new CultureInfo("en-US"))
};
List<CustomerOrderInfo> customerOrderInfo = new List<CustomerOrderInfo>(customerOrderInfoQuery);
PLINQ - new school😄
xxxxxxxxxx
var customerOrderInfo = CustomersInMemory.Customers.AsParallel().Join(
OrdersInMemory.Orders.AsParallel(),
c => c.Split(',')[0],
o => o.Split(',')[1],
(c, o) => new CustomerOrderInfo
{
CustomerID = c.Split(',')[0],
CompanyName = c.Split(',')[1],
OrderID = Convert.ToInt32(o.Split(',')[0]),
OrderDate = Convert.ToDateTime(o.Split(',')[2],
new CultureInfo("en-US"))
}
).ToList();
Some Thought🤔
[2022/01/26]I used to code in the new school style. But recently I think... the old school is quite straight forward and relevant to English...
24.2.2. Canceling a PLINQ query
Very easy. Just take .WithCancellation()
xxxxxxxxxx
using CancellationTokenSource cts = new();
int[] results = null;
try
{
results =
(from num in source.AsParallel().WithCancellation(cts.Token)
where num % 3 == 0
orderby num descending
select num).ToArray();
}
catch{}
24.3. Synchronizing concurrent access to data
📌What is the risk during concurrent process?
If not doing correct, concurrent process might corrupt the data during overlapping operation.
📌Corrupt Data Example
❌
xxxxxxxxxx
static void ParallelTest()
{
int[] data = new int[NUMELEMENTS];
int j = 0; //variable j is outside of the Parallel.For scope
Parallel.For(0, NUMELEMENTS, (i) =>
{
j = i;
doAdditionalProcessing();
data[i] = j;
doMoreAdditionalProcessing();
});
for (int i = 0; i < NUMELEMENTS; i++)
{
Console.WriteLine($"Element {i} has value {data[i]}");
}
}
The preceding method simply records the current loop index into a shared variable j
, and store back the j
value to current index of the array.
This is WRONG!!! Try NOT to use shared variable in concurrent process!!!⚠️
24.3.1. lock
data
If you really need to use shared data in concurrent operation, then lock
is one of the choice.
📌What is lock
?
You can use lock
keyword to guarantee exclusive access[^12] to resources.
📌lock
example
xxxxxxxxxx
//you can use any reference type as a lock
//in convention, just use object is enough
object myLockObject = new object();
//...
lock(myLockObject)
{
// Code that requires exclusive access to a shared resource
//..
}
📌How does lock
work?
- 1️⃣ the
lock
statement attempts to obtain a mutual-exclusion lock - 2️⃣ once the 1st entered item have the lock, other threads will be blocked outside of the lock and wait
- 3️⃣ 1st entered item finished the job and left... the lock is open for another item
24.3.2. Synchronization primitives
Mutual exclusion lock is one of the locking techniques. In the following, we will introduce more techniques.
Overview of synchronization primitives
Synchronizing data for multithreading
📌Different function of locking techniques
- 1️⃣a single task has sole access to a resource, (simple exclusion lock)
- 2️⃣multiple tasks access a resource simultaneously with controlled manner, (semaphores)
- 3️⃣share read-only access to a resource simultaneously while guaranteeing exclusive access to modify the resource, (reader/writer locks)
Locking Techniques | Sole Access Read | Sole Access Write | Simultaneous Access Read | Simultaneous Access Write |
---|---|---|---|---|
simple exclusion lock | ✔️ | ✔️ | ❌ | ❌ |
semaphores | ❌ | ❌ | ✔️(controlled) | ✔️(controlled) |
reader/writer locks | ❌ | ✔️ | ✔️ | ❌ |
📌ManualResetEventSlim
Class⭐️
Fun Fact
ManualResetEventSlim
is the light weight version of ManualResetEvent
and that's why call it "Slim".
Function
ManualResetEventSlim
provides functionality by which one or more tasks can wait for an event.
How to use it?
An object of ManualResetEventSlim
can be 1 of 2 states: signaled (true) and unsignaled (false).
You can use Set()
to change unsignaled
to signal
.
You can use Reset()
to change signaled
to unsignaled
.
It is super similar to PLC connection!!⭐️
Example
xxxxxxxxxx
class Example
{
//instance of ManualResetEventSlim
static ManualResetEventSlim mreGotSignal;
//input from user
static string input = null;
//create a method started by thred 2
public static void GetUserInput()
{
Console.WriteLine("Waiting user's input...");
input = Console.ReadLine();
mreGotSignal?.Set(); //if got input, set the flag true
Console.WriteLine($"Received user's input and the flag of mreGotSignal is {mreGotSignal.IsSet}");
}
static void Main()
{
//init the mre instance to be false at first
mreGotSignal = new ManualResetEventSlim(false);
while (true)
{
Console.WriteLine("Start listening input...");
//running thread 2
Task.Run(GetUserInput);
Console.WriteLine("Main thread waiting mreGotSignal...");
mreGotSignal?.Wait(); //wait here for the signal from thread 2
Console.WriteLine($"Main thread signaled, received data: {input}");
Console.WriteLine("Rest flag for next round...");
mreGotSignal?.Reset();
}
}
}
📌SemaphoreSlim
Class
Function
Represents a lightweight alternative to Semaphore that limits the number of threads that can access a resource or pool of resources concurrently.
How to use it?
Init Semaphore
with the number of resources in the pool. public SemaphoreSlim(int initialCount, int maxCount)
- when access the resource, invoke the
Wait()
, the gate reduce - when quit accessing, invoke
Release()
, the gate increase
Example
xxxxxxxxxx
class Example
{
private static SemaphoreSlim semaphore;
private static int padding;
static void Main()
{
//Create the semaphore
//It means the max gates is 3, but right now the gate is 0
semaphore = new SemaphoreSlim(0, 3);
Console.WriteLine("{0} tasks can enter the semaphore.",
semaphore.CurrentCount);
Task[] tasks = new Task[5];
//Create and start 5 numbered tasks
for (int i = 0; i < 5; i++)
{
tasks[i] = Task.Run(() =>
{
//Each task begins by requesting the semaphore
Console.WriteLine("Task {0} begins and waits for the semaphore.",
Task.CurrentId);
int semaphoreCount;
semaphore.Wait();
//the code below will wait to start until semaphore is signaled
try
{
Interlocked.Add(ref padding, 100);
Console.WriteLine("Task {0} enters the semaphore.", Task.CurrentId);
//The task sleeps for 1+ sec
Thread.Sleep(1000 + padding);
}
finally
{
semaphoreCount = semaphore.Release();
}
Console.WriteLine("Task {0} releases the semaphore; previous count: {1}.",
Task.CurrentId, semaphore.CurrentCount);
});
}
// Wait for half a second, to allow all the tasks to start and block.
Thread.Sleep(500);
// (Open the gate) set the semaphore count to its maximum value.
Console.Write("Main thread calls Release(3) --> ");
semaphore.Release(3);
Console.WriteLine("{0} tasks can enter the semaphore.",
semaphore.CurrentCount);
// Main thread waits for the tasks to complete.
Task.WaitAll(tasks);
Console.WriteLine("Main thread exits.");
}
}
📌CountdownEvent
Class
Function
Represents a synchronization primitive that is signaled when its count reaches zero. You can think of the CountdownEvent
class as a cross between the inverse of SemaphoreSlim
and ManualResetEventSlim
.
Comparison
Why "the inverse"? Because CountdownEvent
blocks Task when value SemaphoreSlim
and ManualResetEventSlim
blocks Task when value
Example
xxxxxxxxxx
class Example
{
const int N = 10000;
static async Task Main()
{
//Init a queue and a CountdownEvent
ConcurrentQueue<int> queue = new ConcurrentQueue<int>(Enumerable.Range(0, N));
CountdownEvent cdE = new CountdownEvent(N); //initial count = 10000
//This is the logic for all queue consumers
Action consumer = () =>
{
int local;
while (queue.TryDequeue(out local))
{
cdE.Signal();
}
};
//Now empty the queue with a couple of asynchronous tasks
Task t1 = Task.Factory.StartNew(consumer);
Task t2 = Task.Factory.StartNew(consumer);
//Wait here for queue to empty by waiting on cdE
cdE.Wait(); //will return when cdE count reaches 0
Console.WriteLine("Done empty queue. InitialCount={0}, CurrentCount={1}, IsSet={2}",
cdE.InitialCount, cdE.CurrentCount, cdE.IsSet);
//Proper form is to wait for the tasks to complete, even though you know
//their work is done already.
await Task.WhenAll(t1, t2);
//Resetting will cause the CountdownEvent to unset, and reset both InitialCount
//and CurrentCount to the specified value
cdE.Reset(10);
// AddCount will affect the CurrentCount, but not the InitialCount
cdE.AddCount(2);
Console.WriteLine("After Reset(10), AddCount(2): InitialCount={0}, CurrentCount={1}, IsSet={2}",
cdE.InitialCount, cdE.CurrentCount, cdE.IsSet);
// Now try waiting with cancellation
CancellationTokenSource cts = new CancellationTokenSource();
cts.Cancel(); // cancels the CancellationTokenSource
try
{
cdE.Wait(cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("cde.Wait(preCanceledToken) threw OCE, as expected");
}
finally
{
cts.Dispose();
}
// It's good to release a CountdownEvent when you're done with it.
cdE.Dispose();
}
}
📌ReaderWriterLockSlim
Class⭐️
Function
Represents a lock that is used to manage access to a resource, allowing multiple threads for reading or exclusive access for writing.
How to use it?
When a Task
needs to read something, 1️⃣ EnterReadLock()
, after finished, 2️⃣ExitReadLock()
When a Task
needs to write something, 1️⃣ EnterWriteLock()
, after finished, 2️⃣ExitWriteLock()
Example
The following example includes simple methods to add to the cache, delete from the cache, and read from the cache. To demonstrate time-outs, the example includes a method that adds to the cache only if it can do so within a specified time-out.
xxxxxxxxxx
public class SynchronizedCache
{
//a private field - readerWriterSlim instance
private ReaderWriterLockSlim _cacheLock = new ReaderWriterLockSlim();
//a dictionary to be operated
private Dictionary<int, string> _innerCache = new Dictionary<int, string>();
//different status modifying dictionary
public enum AddOrUpdateStatus
{
Added,
Updated,
Unchanged
};
public int Count
{ get { return _innerCache.Count;} }
//read function
public string Read(int key)
{
_cacheLock.EnterReadLock();
try
{
return _innerCache[key];
}
finally
{
_cacheLock.ExitReadLock();
}
}
//add function
public void Add(int key, string value)
{
_cacheLock.EnterWriteLock();
try
{
_innerCache.Add(key, value);
}
finally
{
_cacheLock.ExitWriteLock();
}
}
//fancy add function, exit with timeout
public bool AddWithTimeout(int key, string value, int timeout)
{
if (_cacheLock.TryEnterWriteLock(timeout))
{
try
{
_innerCache.Add(key, value);
}
finally
{
_cacheLock.ExitWriteLock();
}
return true;
}
else
{
return false;
}
}
//fancy add function, if the "add" is really add or update or unchanged
public AddOrUpdateStatus AddOrUpdate(int key, string value)
{
_cacheLock.EnterUpgradeableReadLock();
try
{
string result = null;
if (_innerCache.TryGetValue(key, out result))
{
if (result == value)
{
return AddOrUpdateStatus.Unchanged;
}
else
{
_cacheLock.EnterWriteLock();
try
{
_innerCache[key] = value;
}
finally
{
_cacheLock.ExitWriteLock();
}
return AddOrUpdateStatus.Updated;
}
}
else
{
_cacheLock.EnterWriteLock();
try
{
_innerCache.Add(key, value);
}
finally
{
_cacheLock.ExitWriteLock();
}
return AddOrUpdateStatus.Added;
}
}
finally
{
_cacheLock.ExitUpgradeableReadLock();
}
}
//delete function
public void Delete(int key)
{
_cacheLock.EnterWriteLock();
try
{
_innerCache.Remove(key);
}
finally
{
_cacheLock.ExitWriteLock();
}
}
//deconstructor
~SynchronizedCache()
{
if (_cacheLock != null) _cacheLock.Dispose();
}
}
Now, we use the preceding class to code:
xxxxxxxxxx
static void Main()
{
var sc = new SynchronizedCache();
var tasks = new List<Task>();
int itemWritten = 0;
//Execute a writer
tasks.Add(Task.Run(() =>
{
String[] vegetables = { "broccoli", "cauliflower",
"carrot", "sorrel", "baby turnip",
"beet", "brussel sprout",
"cabbage", "plantain",
"spinach", "grape leaves",
"lime leaves", "corn",
"radish", "cucumber",
"raddichio", "lima beans" };
for (int i = 0; i < vegetables.Length; i++)
{
sc.Add(i, vegetables[i]);
}
itemWritten = vegetables.Length;
Console.WriteLine("Task {0} wrote {1} items\n",
Task.CurrentId, itemWritten);
}));
// Execute two readers,
// the first reader to read from first to last item in dict
// the second reader from last to first item in dict
for (int i = 0; i < 2; i++)
{
//there are 2 loop. 0 for ascending, 1 for descending
bool desc = (i == 1);
tasks.Add(Task.Run(() =>
{
int start, last, step, items;
do
{
string output = string.Empty;
items = sc.Count;
if (!desc)
{
start = 0;
step = 1;
last = items;
}
else
{
start = items;
step = -1;
last = 0;
}
for (int index = start; desc ? index > last : index < last; index += step)
{
output += String.Format("[{0}] ", sc.Read(index));
}
Console.WriteLine("Task {0} read {1} items: {2}\n",
Task.CurrentId, items, output);
} while (items < itemWritten | itemWritten == 0);
}));
}
//Execute a read/update task
tasks.Add(Task.Run(() =>
{
Thread.Sleep(100);
for (int i = 0; i < sc.Count; i++)
{
string value = sc.Read(i);
if (value == "cucumber")
{
if (sc.AddOrUpdate(i, "green been")!=SynchronizedCache.AddOrUpdateStatus.Unchanged)
{
Console.WriteLine("Changed 'cucumber' to 'green bean'");
}
}
}
}));
//Wait for all tasks to complete
Task.WaitAll(tasks.ToArray());
//Display the final content of the cache
Console.WriteLine();
Console.WriteLine("Values in SynchronizedCache: ");
for (int i = 0; i < sc.Count; i++)
{
Console.WriteLine(" {0}: {1}",i, sc.Read(i));
}
}
📌Barrier
Class
//TODO
24.3.3. Cancel synchronization
The ManualResetEventSlim
, SemaphoreSlim
, CountdownEvent
, and Barrier
classes all support cancellation.
xxxxxxxxxx
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;
...
// Semaphore that protects a pool of 3 resources
SemaphoreSlim semaphoreSlim = new SemaphoreSlim(3);
...
// Wait on the semaphore, and catch the OperationCanceledException if
// another thread calls Cancel on cancellationTokenSource
try
{
semaphoreSlim.Wait(cancellationToken);
}
catch (OperationCanceledException oce)
{
...
}
24.3.4. Concurrent collection classes
📌When should you use?
If you consider synchronization primitives are not scalable enough and this manual process is potentially prone, you can use System.Collections.Concurrent
which is a small set of thread-safe collection classes and interfaces.
📌Notes before use
Adding thread safety to the methods in a collection class imposes additional run-time overhead, so these classes are not as fast as the regular collection classes.⚠️ So use it when you really need it.
📌What are they?
ConcurrentBag<T>
- description: a general-purpose class for holding an unordered collection of items
- function:
Add
,TryTake
,TryPeek
ConcurrentDictionary<TKey, TValue>
- description: a thread-safe version of the generic
Dictionary<TKey, TValue>
- function:
TryAdd
,ContainsKey
,TryGetValue
,TryRemove
- description: a thread-safe version of the generic
ConcurrentQueue<T>
- description: a thread-safe version of the generic
Queue<T>
class - function:
Enqueue
,TryDequeue
, andTryPeek
- description: a thread-safe version of the generic
ConcurrentStack<T>
- description: a thread-safe implementation of the generic
Stack<T>
- function:
Push
,TryPop
, andTryPeek
- description: a thread-safe implementation of the generic