Question: The 'peach' keyword in q allows for the user to run a function over multiple threads. The number of threads spawned is equal to the number of slaves enabled (-s flag commandline, \s runtime). Q distributes jobs to threads in round robin fashion at invocation. For example, with 2 threads, "f peach `a`b`c`d" will have `a and `c assigned to Thread 1, `b and `d assigned to Thread 2. This is not optimal because you can have the case where load is not balanced amongst threads, which will result in a state where some threads are working while others are idle. This will slow down the execution speed of the multi-threaded function, as the speed is bounded by the last thread to finish. Luckily, we can implement our own thread scheduling (written in C) such that no thread sits idle. Whenever a thread finishes execution, it will take on the next item. In this example, we will be implementing an optimized multi-threaded version of nanosleep. The function is defined as 'psleep' and takes in 2 K objects as arguments, a list and number denoting number of threads to use.
Example
q)\s 2
q)psleep:`threads 2: (`psleep;2) / load 'psleep' from shared library (threads.so)
q)sleep:`threads 2: (`nsleep;1) / load 'nsleep' from shared library (threads.so)
q)sleep peach 00:00:01 00:00:00.100n
0 0i
q)psleep[00:00:01 00:00:00.100n;2]
0 0i
q)\t sleep peach 00:00:01 00:00:00.100n
1000
q)\t psleep[00:00:01 00:00:00.100n;2]
1000
q)\t sleep peach 00:00:02 00:00:01 00:00:02 00:00:01n // thread 1 has heavier load
4000
q)\t psleep[00:00:02 00:00:01 00:00:02 00:00:01n;2] // same input as previous example but faster due to better load balancing
3001
q)\t sleep peach 00:00:01 00:00:01 00:00:02 00:00:02n
3001
q)\t psleep[00:00:01 00:00:01 00:00:02 00:00:02n;2] // performs about the same as peach for ideal ordering
3002
q)\t sleep peach 20#00:00:01 00:00:02n
20005
q)\t psleep[20#00:00:01 00:00:02n;2]
16007
######## solution.q ########
psleep:`threads 2: (`psleep;2) / load 'psleep' from shared library (threads.so)
sleep:`threads 2: (`nsleep;1) / load 'nsleep' from shared library (threads.so)
######## threads.c ########
// threads.c
#include <time.h>
#include <pthread.h>
#include <stdlib.h>
#include"k.h"
pthread_mutex_t lock;
int currThreads;
struct arguments {
int threadNo;
int itemNo;
K e;
int *threadState;
int *finished;
K results;
};
K nsleep(K nanos){
int type = nanos->t;
if((type != -KJ) && (type != -KN)) return krr("type");
struct timespec t;
t.tv_sec = nanos->j / 1000000000ULL;
t.tv_nsec = nanos->j % 1000000000ULL;
return ki(nanosleep(&t, &t));
}
void* run(void *args) {
struct arguments *a = (struct arguments*)args;
kI(a->results)[a->itemNo] = nsleep(a->e)->j;
pthread_mutex_lock(&lock);
a->threadState[a->threadNo] = 0;
--currThreads;
pthread_mutex_unlock(&lock);
a->finished[a->itemNo] = 1;
free(a);
return 0;
}
// parallel nanosleep
K psleep(K list, K n) {
int type = list->t;
if(type < 0) return nsleep(list); // running psleep on atom
if((type != KJ) && (type != KN)) return krr("type");
int numItems = list->j;
int maxThreads = n->j;
if(maxThreads > numItems) maxThreads = numItems;
int finished[numItems], processed[numItems], threadState[maxThreads];
K results = ktn(KI, numItems);
for(int i = 0; i < numItems; ++i){
finished[i] = 0;
processed[i] = 0;
}
for(int i = 0; i < maxThreads; ++i) threadState[i] = 0;
pthread_t tid[maxThreads];
while(1) {
int done = 1;
for(int i = 0; i < numItems; ++i) {
if(finished[i] == 0) {
done = 0;
break;
}
}
if(done) break;
if(currThreads >= maxThreads) continue;
int threadNo = -1;
for (int i = 0; i < maxThreads; ++i) {
if(threadState[i] == 0) {
threadNo = i;
break;
}
}
if(threadNo != -1) {
int itemNo = -1;
for(int i = 0; i < numItems; ++i) {
if(processed[i] == 0) {
itemNo = i;
break;
}
}
if(itemNo != -1) {
struct arguments *args = malloc(sizeof(struct arguments));
args->threadNo = threadNo;
args->itemNo = itemNo;
args->e = kj(kJ(list)[itemNo]);
args->threadState = threadState;
args->finished = finished;
args->results = results;
threadState[threadNo] = 1;
processed[itemNo] = 1;
++currThreads;
pthread_create(&tid[threadNo], NULL, run, (void *)args);
}
}
}
return results;
}
// generate shared object
// gcc -shared -fPIC -DKXVER=3 threads.c -o threads.so -Wall
Explanation: In the 'psleep function', create variables to keep state of threads, items, and the result. Run indefinitely until all items are completed. To schedule an item for processing using a thread the following conditions must be met: there is an idle thead available and there is an existing item that has not been processed yet. When the thread is scheduled, the thread state changes to busy (denoted as 1) and the processed state for the item changes to true (denoted as 1). To pass global state to the threads, we use a mixture of global variables and pointers that are passed to the start routine ('run' in this case). In the 'run' routine, invoke the 'sleep' function and update the result list for that item's index in the list. Lock all other threads (to prevent concurrent update) and update the thread state to busy (denoted as 1). Update the finished state for this item to completed (denoted as 1). Once all items have been completed, the infinite loop will end and the result list will be returned to q.