xref: /NextBSD/lib/libdispatch/src/apply.c (revision 33da5adc555b3bc29986eeadca03829e4ad06b1e)
1 /*
2  * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3  *
4  * @APPLE_APACHE_LICENSE_HEADER_START@
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * @APPLE_APACHE_LICENSE_HEADER_END@
19  */
20 
21 #include "internal.h"
22 
23 typedef void (*dispatch_apply_function_t)(void *, size_t);
24 
25 DISPATCH_ALWAYS_INLINE
26 static inline void
_dispatch_apply_invoke2(void * ctxt,bool redirect)27 _dispatch_apply_invoke2(void *ctxt, bool redirect)
28 {
29 	dispatch_apply_t da = (dispatch_apply_t)ctxt;
30 	size_t const iter = da->da_iterations;
31 	size_t idx, done = 0;
32 
33 	idx = dispatch_atomic_inc_orig2o(da, da_index, acquire);
34 	if (!fastpath(idx < iter)) goto out;
35 
36 	// da_dc is only safe to access once the 'index lock' has been acquired
37 	dispatch_apply_function_t const func = (void *)da->da_dc->dc_func;
38 	void *const da_ctxt = da->da_dc->dc_ctxt;
39 	dispatch_queue_t dq = da->da_dc->dc_data;
40 
41 	_dispatch_perfmon_workitem_dec(); // this unit executes many items
42 
43 	// Handle nested dispatch_apply rdar://problem/9294578
44 	size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
45 	_dispatch_thread_setspecific(dispatch_apply_key, (void*)da->da_nested);
46 
47 	dispatch_queue_t old_dq;
48 	pthread_priority_t old_dp;
49 	if (redirect) {
50 		old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
51 		_dispatch_thread_setspecific(dispatch_queue_key, dq);
52 		old_dp = _dispatch_set_defaultpriority(dq->dq_priority);
53 	}
54 
55 	// Striding is the responsibility of the caller.
56 	do {
57 		_dispatch_client_callout2(da_ctxt, idx, func);
58 		_dispatch_perfmon_workitem_inc();
59 		done++;
60 		idx = dispatch_atomic_inc_orig2o(da, da_index, relaxed);
61 	} while (fastpath(idx < iter));
62 
63 	if (redirect) {
64 		_dispatch_reset_defaultpriority(old_dp);
65 		_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
66 	}
67 	_dispatch_thread_setspecific(dispatch_apply_key, (void*)nested);
68 
69 	// The thread that finished the last workitem wakes up the possibly waiting
70 	// thread that called dispatch_apply. They could be one and the same.
71 	if (!dispatch_atomic_sub2o(da, da_todo, done, release)) {
72 		_dispatch_thread_semaphore_signal(da->da_sema);
73 	}
74 out:
75 	if (dispatch_atomic_dec2o(da, da_thr_cnt, release) == 0) {
76 		_dispatch_continuation_free((dispatch_continuation_t)da);
77 	}
78 }
79 
80 DISPATCH_NOINLINE
81 void
_dispatch_apply_invoke(void * ctxt)82 _dispatch_apply_invoke(void *ctxt)
83 {
84 	_dispatch_apply_invoke2(ctxt, false);
85 }
86 
87 DISPATCH_NOINLINE
88 void
_dispatch_apply_redirect_invoke(void * ctxt)89 _dispatch_apply_redirect_invoke(void *ctxt)
90 {
91 	_dispatch_apply_invoke2(ctxt, true);
92 }
93 
94 static void
_dispatch_apply_serial(void * ctxt)95 _dispatch_apply_serial(void *ctxt)
96 {
97 	dispatch_apply_t da = (dispatch_apply_t)ctxt;
98 	dispatch_continuation_t dc = da->da_dc;
99 	size_t const iter = da->da_iterations;
100 	size_t idx = 0;
101 
102 	_dispatch_perfmon_workitem_dec(); // this unit executes many items
103 	do {
104 		_dispatch_client_callout2(dc->dc_ctxt, idx, (void*)dc->dc_func);
105 		_dispatch_perfmon_workitem_inc();
106 	} while (++idx < iter);
107 
108 	_dispatch_continuation_free((dispatch_continuation_t)da);
109 }
110 
111 DISPATCH_ALWAYS_INLINE
112 static inline void
_dispatch_apply_f2(dispatch_queue_t dq,dispatch_apply_t da,dispatch_function_t func)113 _dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
114 		dispatch_function_t func)
115 {
116 	uint32_t i = 0;
117 	dispatch_continuation_t head = NULL, tail = NULL;
118 
119 	// The current thread does not need a continuation
120 	uint32_t continuation_cnt = da->da_thr_cnt - 1;
121 
122 	dispatch_assert(continuation_cnt);
123 
124 	for (i = 0; i < continuation_cnt; i++) {
125 		dispatch_continuation_t next = _dispatch_continuation_alloc();
126 		next->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
127 		next->dc_func = func;
128 		next->dc_ctxt = da;
129 		_dispatch_continuation_voucher_set(next, 0);
130 		_dispatch_continuation_priority_set(next, 0, 0);
131 
132 		next->do_next = head;
133 		head = next;
134 
135 		if (!tail) {
136 			tail = next;
137 		}
138 	}
139 
140 	_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
141 	da->da_sema = sema;
142 
143 	_dispatch_queue_push_list(dq, head, tail, head->dc_priority,
144 			continuation_cnt);
145 	// Call the first element directly
146 	_dispatch_apply_invoke(da);
147 	_dispatch_perfmon_workitem_inc();
148 
149 	_dispatch_thread_semaphore_wait(sema);
150 	_dispatch_put_thread_semaphore(sema);
151 
152 }
153 
154 static void
_dispatch_apply_redirect(void * ctxt)155 _dispatch_apply_redirect(void *ctxt)
156 {
157 	dispatch_apply_t da = (dispatch_apply_t)ctxt;
158 	uint32_t da_width = 2 * (da->da_thr_cnt - 1);
159 	dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq;
160 
161 	do {
162 		uint32_t running, width = rq->dq_width;
163 		running = dispatch_atomic_add2o(rq, dq_running, da_width, relaxed);
164 		if (slowpath(running > width)) {
165 			uint32_t excess = width > 1 ? running - width : da_width;
166 			for (tq = dq; 1; tq = tq->do_targetq) {
167 				(void)dispatch_atomic_sub2o(tq, dq_running, excess, relaxed);
168 				if (tq == rq) {
169 					break;
170 				}
171 			}
172 			da_width -= excess;
173 			if (slowpath(!da_width)) {
174 				return _dispatch_apply_serial(da);
175 			}
176 			da->da_thr_cnt -= excess / 2;
177 		}
178 		rq = rq->do_targetq;
179 	} while (slowpath(rq->do_targetq));
180 	_dispatch_apply_f2(rq, da, _dispatch_apply_redirect_invoke);
181 	do {
182 		(void)dispatch_atomic_sub2o(dq, dq_running, da_width, relaxed);
183 		dq = dq->do_targetq;
184 	} while (slowpath(dq->do_targetq));
185 }
186 
187 #define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX)
188 
189 DISPATCH_NOINLINE
190 void
dispatch_apply_f(size_t iterations,dispatch_queue_t dq,void * ctxt,void (* func)(void *,size_t))191 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
192 		void (*func)(void *, size_t))
193 {
194 	if (slowpath(iterations == 0)) {
195 		return;
196 	}
197 	uint32_t thr_cnt = dispatch_hw_config(active_cpus);
198 	size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
199 	if (!slowpath(nested)) {
200 		nested = iterations;
201 	} else {
202 		thr_cnt = nested < thr_cnt ? thr_cnt / nested : 1;
203 		nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX
204 				? nested * iterations : DISPATCH_APPLY_MAX;
205 	}
206 	if (iterations < thr_cnt) {
207 		thr_cnt = (uint32_t)iterations;
208 	}
209 	struct dispatch_continuation_s dc = {
210 		.dc_func = (void*)func,
211 		.dc_ctxt = ctxt,
212 	};
213 	dispatch_apply_t da = (typeof(da))_dispatch_continuation_alloc();
214 	da->da_index = 0;
215 	da->da_todo = iterations;
216 	da->da_iterations = iterations;
217 	da->da_nested = nested;
218 	da->da_thr_cnt = thr_cnt;
219 	da->da_dc = &dc;
220 
221 	dispatch_queue_t old_dq;
222 	old_dq = (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
223 	if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) {
224 		dq = old_dq ? old_dq : _dispatch_get_root_queue(
225 				_DISPATCH_QOS_CLASS_DEFAULT, false);
226 		while (slowpath(dq->do_targetq)) {
227 			dq = dq->do_targetq;
228 		}
229 	}
230 	if (slowpath(dq->dq_width <= 2) || slowpath(thr_cnt <= 1)) {
231 		return dispatch_sync_f(dq, da, _dispatch_apply_serial);
232 	}
233 	if (slowpath(dq->do_targetq)) {
234 		if (slowpath(dq == old_dq)) {
235 			return dispatch_sync_f(dq, da, _dispatch_apply_serial);
236 		} else {
237 			dc.dc_data = dq;
238 			return dispatch_sync_f(dq, da, _dispatch_apply_redirect);
239 		}
240 	}
241 	_dispatch_thread_setspecific(dispatch_queue_key, dq);
242 	_dispatch_apply_f2(dq, da, _dispatch_apply_invoke);
243 	_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
244 }
245 
246 #ifdef __BLOCKS__
247 #if DISPATCH_COCOA_COMPAT
248 DISPATCH_NOINLINE
249 static void
250 _dispatch_apply_slow(size_t iterations, dispatch_queue_t dq,
251 		void (^work)(size_t))
252 {
253 	dispatch_block_t bb = _dispatch_Block_copy((void *)work);
254 	dispatch_apply_f(iterations, dq, bb,
255 			(dispatch_apply_function_t)_dispatch_Block_invoke(bb));
256 	Block_release(bb);
257 }
258 #endif
259 
260 void
261 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
262 {
263 #if DISPATCH_COCOA_COMPAT
264 	// Under GC, blocks transferred to other threads must be Block_copy()ed
265 	// rdar://problem/7455071
266 	if (dispatch_begin_thread_4GC) {
267 		return _dispatch_apply_slow(iterations, dq, work);
268 	}
269 #endif
270 	dispatch_apply_f(iterations, dq, work,
271 			(dispatch_apply_function_t)_dispatch_Block_invoke(work));
272 }
273 #endif
274 
275 #if 0
276 #ifdef __BLOCKS__
277 void
278 dispatch_stride(size_t offset, size_t stride, size_t iterations,
279 		dispatch_queue_t dq, void (^work)(size_t))
280 {
281 	dispatch_stride_f(offset, stride, iterations, dq, work,
282 			(dispatch_apply_function_t)_dispatch_Block_invoke(work));
283 }
284 #endif
285 
286 DISPATCH_NOINLINE
287 void
288 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
289 		dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
290 {
291 	if (stride == 0) {
292 		stride = 1;
293 	}
294 	dispatch_apply(iterations / stride, queue, ^(size_t idx) {
295 		size_t i = idx * stride + offset;
296 		size_t stop = i + stride;
297 		do {
298 			func(ctxt, i++);
299 		} while (i < stop);
300 	});
301 
302 	dispatch_sync(queue, ^{
303 		size_t i;
304 		for (i = iterations - (iterations % stride); i < iterations; i++) {
305 			func(ctxt, i + offset);
306 		}
307 	});
308 }
309 #endif
310