Paradox Game Engine  v1.0.0 beta06
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Events Macros Pages
Scheduler.cs
Go to the documentation of this file.
1 // Copyright (c) 2014 Silicon Studio Corp. (http://siliconstudio.co.jp)
2 // This file is distributed under GPL v3. See LICENSE.md for details.
3 using System;
4 using System.Collections.Generic;
5 using System.Linq;
6 using System.Runtime.ExceptionServices;
7 using System.Threading;
8 using System.Threading.Tasks;
9 using SiliconStudio.Core.Collections;
10 using SiliconStudio.Core.Diagnostics;
11 
12 namespace SiliconStudio.Core.MicroThreading
13 {
14  /// <summary>
15  /// Scheduler that manage a group of cooperating <see cref="MicroThread"/>.
16  /// </summary>
17  /// <remarks>
18  /// Microthreading provides a way to execute many small execution contexts who cooperatively yield to each others.
19  /// </remarks>
20  public class Scheduler
21  {
22  internal static Logger Log = GlobalLogger.GetLogger("Scheduler");
23 
24  // An ever-increasing counter that will be used to have a "stable" microthread scheduling (first added is first scheduled)
25  internal long SchedulerCounter;
26 
27  internal PriorityNodeQueue<MicroThread> scheduledMicroThreads = new PriorityNodeQueue<MicroThread>();
28  internal LinkedList<MicroThread> allMicroThreads = new LinkedList<MicroThread>();
29 
30  private ThreadLocal<MicroThread> runningMicroThread = new ThreadLocal<MicroThread>();
31 
32  public event EventHandler<SchedulerThreadEventArgs> MicroThreadStarted;
33  public event EventHandler<SchedulerThreadEventArgs> MicroThreadEnded;
34 
35  public event EventHandler<SchedulerThreadEventArgs> MicroThreadCallbackStart;
36  public event EventHandler<SchedulerThreadEventArgs> MicroThreadCallbackEnd;
37 
38  /// <summary>
39  /// Initializes a new instance of the <see cref="Scheduler" /> class.
40  /// </summary>
41  public Scheduler()
42  {
43  FrameChannel = new Channel<int> { Preference = ChannelPreference.PreferSender };
44  }
45 
46  /// <summary>
47  /// Gets the current running micro thread in this scheduler through <see cref="Run"/>.
48  /// </summary>
49  /// <value>The current running micro thread in this scheduler.</value>
50  public MicroThread RunningMicroThread
51  {
52  get { return runningMicroThread.Value; }
53  }
54 
55  /// <summary>
56  /// Gets the scheduler associated with current micro thread.
57  /// </summary>
58  /// <value>The scheduler associated with current micro thread.</value>
59  public static Scheduler Current
60  {
61  get
62  {
63  var currentThread = CurrentMicroThread;
64  return (currentThread != null) ? currentThread.Scheduler : null;
65  }
66  }
67 
68  /// <summary>
69  /// Gets the list of every non-stopped micro threads.
70  /// </summary>
71  /// <value>The list of every non-stopped micro threads.</value>
72  public IEnumerable<MicroThread> MicroThreads
73  {
74  get { return allMicroThreads; }
75  }
76 
77  protected Channel<int> FrameChannel { get; private set; }
78 
79  /// <summary>
80  /// Gets the current micro thread (self).
81  /// </summary>
82  /// <value>The current micro thread (self).</value>
83  public static MicroThread CurrentMicroThread
84  {
85  get
86  {
87  var microThreadSyncContext = SynchronizationContext.Current as MicroThreadSynchronizationContext;
88  return (microThreadSyncContext != null) ? microThreadSyncContext.MicroThread : null;
89  }
90  }
91 
92 
93  /// <summary>
94  /// Yields execution.
95  /// If any other micro thread is pending, it will be run now and current micro thread will be scheduled as last.
96  /// </summary>
97  /// <returns></returns>
99  {
100  return new MicroThreadYieldAwaiter(CurrentMicroThread);
101  }
102 
103  /// <summary>
104  /// Yields execution until next frame.
105  /// </summary>
106  /// <returns>Task.</returns>
107  public ChannelMicroThreadAwaiter<int> NextFrame()
108  {
109  if(MicroThread.Current == null)
110  throw new Exception("NextFrame cannot be called out of the micro-thread context.");
111 
112  return FrameChannel.Receive();
113  }
114 
115  /// <summary>
116  /// Runs until no runnable tasklets left.
117  /// This function is reentrant.
118  /// </summary>
119  public void Run()
120  {
121 #if SILICONSTUDIO_PLATFORM_WINDOWS_RUNTIME
122  int managedThreadId = 0;
123 #else
124  int managedThreadId = Thread.CurrentThread.ManagedThreadId;
125 #endif
126 
127  while (true)
128  {
129  Action callback;
130  MicroThread microThread;
131  lock (scheduledMicroThreads)
132  {
133  if (scheduledMicroThreads.Count == 0)
134  break;
135  microThread = scheduledMicroThreads.Dequeue();
136 
137  callback = microThread.Callback;
138  microThread.Callback = null;
139  }
140 
141  // Since it can be reentrant, it should be restored after running the callback.
142  var previousRunningMicrothread = runningMicroThread.Value;
143  if (previousRunningMicrothread != null)
144  {
145  if (MicroThreadCallbackEnd != null)
146  MicroThreadCallbackEnd(this, new SchedulerThreadEventArgs(previousRunningMicrothread, managedThreadId));
147  }
148 
149  runningMicroThread.Value = microThread;
150  var previousSyncContext = SynchronizationContext.Current;
151  SynchronizationContext.SetSynchronizationContext(microThread.SynchronizationContext);
152  try
153  {
154  if (microThread.State == MicroThreadState.Starting && MicroThreadStarted != null)
155  MicroThreadStarted(this, new SchedulerThreadEventArgs(microThread, managedThreadId));
156 
157  if (MicroThreadCallbackStart != null)
158  MicroThreadCallbackStart(this, new SchedulerThreadEventArgs(microThread, managedThreadId));
159 
160  using (Profiler.Begin(microThread.ProfilingKey))
161  {
162  callback();
163  }
164  }
165  catch (Exception e)
166  {
167  Log.Error("Unexpected exception while executing a micro-thread", e);
168  microThread.SetException(e);
169  }
170  finally
171  {
172  if (MicroThreadCallbackEnd != null)
173  MicroThreadCallbackEnd(this, new SchedulerThreadEventArgs(microThread, managedThreadId));
174 
175  SynchronizationContext.SetSynchronizationContext(previousSyncContext);
176  if (microThread.IsOver)
177  {
178  lock (microThread.AllLinkedListNode)
179  {
180  if (microThread.CompletionTask != null)
181  {
182  if (microThread.State == MicroThreadState.Failed || microThread.State == MicroThreadState.Cancelled)
183  microThread.CompletionTask.TrySetException(microThread.Exception);
184  else
185  microThread.CompletionTask.TrySetResult(1);
186  }
187  else if (microThread.State == MicroThreadState.Failed && microThread.Exception != null)
188  {
189  // Nothing was listening on the micro thread and it crashed
190  // Let's treat it as unhandled exception and propagate it
191  // Use ExceptionDispatchInfo.Capture to not overwrite callstack
192  if ((microThread.Flags & MicroThreadFlags.IgnoreExceptions) != MicroThreadFlags.IgnoreExceptions)
193  ExceptionDispatchInfo.Capture(microThread.Exception).Throw();
194  }
195 
196  if (MicroThreadEnded != null)
197  MicroThreadEnded(this, new SchedulerThreadEventArgs(microThread, managedThreadId));
198  }
199  }
200 
201  runningMicroThread.Value = previousRunningMicrothread;
202  if (previousRunningMicrothread != null)
203  {
204  if (MicroThreadCallbackStart != null)
205  MicroThreadCallbackStart(this, new SchedulerThreadEventArgs(previousRunningMicrothread, managedThreadId));
206  }
207  }
208  }
209 
210  while (FrameChannel.Balance < 0)
211  FrameChannel.Send(0);
212  }
213 
214  /// <summary>
215  /// Creates a micro thread out of the specified function and schedules it as last micro thread to run in this scheduler.
216  /// Note that in case of multithreaded scheduling, it might start before this function returns.
217  /// </summary>
218  /// <param name="microThreadFunction">The function to create a micro thread from.</param>
219  /// <param name="flags">The flags.</param>
220  /// <returns>A micro thread.</returns>
221  public MicroThread Add(Func<Task> microThreadFunction, MicroThreadFlags flags = MicroThreadFlags.None)
222  {
223  var microThread = new MicroThread(this, flags);
224  microThread.Start(microThreadFunction);
225  return microThread;
226  }
227 
228  /// <summary>
229  /// Creates a new empty micro thread, that could later be started with <see cref="MicroThread.Start"/>.
230  /// </summary>
231  /// <returns></returns>
233  {
234  return new MicroThread(this);
235  }
236 
237  /// <summary>
238  /// Task that will completes when all MicroThread executions are completed.
239  /// </summary>
240  /// <param name="microThreads">The micro threads.</param>
241  /// <returns></returns>
242  public async Task WhenAll(params MicroThread[] microThreads)
243  {
244  var currentMicroThread = CurrentMicroThread;
245  Task<int>[] continuationTasks;
246  var tcs = new TaskCompletionSource<int>();
247 
248  // Need additional checks: Not sure if we should switch to return a Task and set it before returning it.
249  // It should continue execution right away (no execution flow yielding).
250  lock (microThreads)
251  {
252  if (microThreads.All(x => x.State == MicroThreadState.Completed))
253  return;
254 
255  if (microThreads.Any(x => x.State == MicroThreadState.Failed || x.State == MicroThreadState.Cancelled))
256  throw new AggregateException(microThreads.Select(x => x.Exception).Where(x => x != null));
257 
258  var completionTasks = new List<Task<int>>();
259  foreach (var thread in microThreads)
260  {
261  if (!thread.IsOver)
262  {
263  lock (thread.AllLinkedListNode)
264  {
265  if (thread.CompletionTask == null)
266  thread.CompletionTask = new TaskCompletionSource<int>();
267  }
268  completionTasks.Add(thread.CompletionTask.Task);
269  }
270  }
271 
272  continuationTasks = completionTasks.ToArray();
273  }
274  // Force tasks exception to be checked and propagated
275  await Task.Factory.ContinueWhenAll(continuationTasks, tasks => Task.WaitAll(tasks));
276  }
277 
278  }
279 }
EventHandler< SchedulerThreadEventArgs > MicroThreadEnded
Definition: Scheduler.cs:33
EventHandler< SchedulerThreadEventArgs > MicroThreadCallbackStart
Definition: Scheduler.cs:35
Represents an execution context managed by a Scheduler, that can cooperatively yield execution to ano...
Definition: MicroThread.cs:16
_In_ size_t _In_ DXGI_FORMAT _In_ size_t _In_ DXGI_FORMAT _In_ DWORD flags
Definition: DirectXTexP.h:170
ChannelMicroThreadAwaiter< int > NextFrame()
Yields execution until next frame.
Definition: Scheduler.cs:107
MicroThread Add(Func< Task > microThreadFunction, MicroThreadFlags flags=MicroThreadFlags.None)
Creates a micro thread out of the specified function and schedules it as last micro thread to run in ...
Definition: Scheduler.cs:221
System.Threading.Tasks.Task Task
Base implementation for ILogger.
Definition: Logger.cs:10
Exception Exception
Gets the exception that was thrown by this MicroThread.
Definition: MicroThread.cs:133
void Run()
Runs until no runnable tasklets left. This function is reentrant.
Definition: Scheduler.cs:119
static MicroThread Current
Gets the current micro thread (self).
Definition: MicroThread.cs:181
Scheduler()
Initializes a new instance of the Scheduler class.
Definition: Scheduler.cs:41
MicroThreadFlags Flags
Gets the MicroThread flags.
Definition: MicroThread.cs:141
MicroThread Create()
Creates a new empty micro thread, that could later be started with MicroThread.Start.
Definition: Scheduler.cs:232
async Task WhenAll(params MicroThread[] microThreads)
Task that will completes when all MicroThread executions are completed.
Definition: Scheduler.cs:242
EventHandler< SchedulerThreadEventArgs > MicroThreadCallbackEnd
Definition: Scheduler.cs:36
Scheduler that manage a group of cooperating MicroThread.
Definition: Scheduler.cs:20
MicroThreadState State
Gets the state of this MicroThread.
Definition: MicroThread.cs:126
static MicroThreadYieldAwaiter Yield()
Yields execution. If any other micro thread is pending, it will be run now and current micro thread w...
Definition: Scheduler.cs:98
EventHandler< SchedulerThreadEventArgs > MicroThreadStarted
Definition: Scheduler.cs:32
Provides data for the Scheduler.MicroThreadStarted, Scheduler.MicroThreadEnded, Scheduler.MicroThreadCallbackStart and Scheduler.MicroThreadCallbackEnd events.
Output message to log right away.
bool IsOver
Indicates whether the MicroThread is terminated or not, either in Completed, Cancelled or Failed stat...
Definition: MicroThread.cs:166