Successful multi threading/processing in the Bge using pyOpenCl or popen

It has come to my attention that various people are having difficulties achieving multi threading in the bge. I have successfully done this on two occasions, using two different techniques. The first involves using pyOpenCl to preform procedural hightmap creation on the video card. The second involves using popen on blenders python executable to run a nonblocking networking server/client. In neither case is the sub process able to directly access the bge module. It’s possible that something might be done using ctypes and memory pointers, or maybe using numpy arrays from bytes, but this is currently beyond me, so if it can be done I don’t know how.

pyOpenCl
PyOpenCl can be difficult to install. Difficult enough that it may not be realistic on a consumers computer. If you can get it to install it can be extremely powerful. One of the things video cards are designed for is texture handling, which is basically what I was doing. I got about a 200x performance increase. Thats approximately three minutes down to one second.
Note that pyOpenCl uses pythons internal garbage collection. Things will be taken off the video card at the same time there removed from python. Also pyOpenCl would not compete for memory. If blender used all the video ram, pyOpenCl would simply fold and raise oom. Make sure you put everything in memory as soon as possible and keep it there. Also, insure you only compile your kernel once.

popen
If you know the location of blenders internal python executable, then you can run a python script with it using popen. Unfortunately blenders sys module returns the wrong values. I believe @wkk has a better way of finding it then I do. His method is in his BgeZ module.
If you use the internal player, the process will run until blender is closed. If you use the external player, then it will close when the external player closes. So run your game by clicking “Run in External Player”, rather then hitting “p”. Inorder to read exceptions in the second process, you can use an exception logger with exception hook.
I haven’t been able to get popens standard out to work, so I communicated with the second process using socket. I though “Udp should be reliable enough on local host.”, but I was having connection problems when the processor was overloaded. So now I think “Tcp should be fast enough on local host.”, but ya, I was wrong once. If anyone can figure out a better way to get the data out, please let me know.

cmd = [pathExe, pathPy] + args
pid = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
1 Like

Thanks for sharing!!!

Do you have more Details, maybe a minimal example using the pyOpenCL approach to play with?

ive done threading in the BGE with the Threading library. It’s hell to work with but it’s doable

I was not able to get pyOpenCl to install into my copy of blender. Please beware that it can be difficult to install. So much so that it may not be practicable to distribute code that relies on it. Even as such, it’s so powerful that it is worth investigation, imho. This code works from within a python console but my not work in the Bge.
The majority of this code is predominantly boilerplate. Just copy and pate it.
PyOpenCl can be a bit archaic, but once you work with it a little its not two hard to use. If you cant read the code I’ll add comments or something.

import pyopencl as cl
import numpy as np

ctx = cl.create_some_context(interactive=False)
queue = cl.CommandQueue(ctx)



##############
" Image Demo "
##############

cl_imageDemo = """
__kernel void main(
read_only image2d_t inImg,
read_only uchar inNum,
write_only image2d_t outImg
)
{
const sampler_t sampler =  CLK_NORMALIZED_COORDS_FALSE | CLK_ADDRESS_CLAMP_TO_EDGE | CLK_FILTER_NEAREST;
int2 post = (int2)(get_global_id(0), get_global_id(1));
uint4 cell = read_imageui( inImg, sampler, (int2)( post.x, post.y) );

cell.x += inNum;
cell.y += inNum;
cell.z += inNum;
cell.w += inNum;

write_imageui( outImg, post, cell );
}"""
cl_imageDemo = cl.Program(ctx, cl_imageDemo).build()

def imageDemo(inImage, inNumber):
  """
  Demostraits how to pass an image into and outof ocl.
  """
  inSize = inImage.shape[0], inImage.shape[1]
  outImage = np.empty_like(inImage)
  outSize = outImage.shape[0], outImage.shape[1]
  
  inImageBuffer = cl.image_from_array(ctx, inImage, num_channels=4, mode='r')
  inNumberBuffer = np.uint8(inNumber)
  outImageBuffer = cl.image_from_array(ctx, outImage, num_channels=4, mode='w')
  
  args = inImageBuffer, inNumberBuffer, outImageBuffer
  
  cl_imageDemo.main(queue, outSize, None, *args).wait()
  cl.enqueue_copy(queue, outImage, outImageBuffer, origin=(0, 0), region=outSize)
  return outImage



##############
" Array Demo "
##############

cl_arrayDemo = """
__kernel void main(
__global const int2 *inArry,
read_only uint inNum,
__global int2 *outArry
)
{
int gid = get_global_id(0);
int2 point = inArry[gid];

outArry[gid].x = point.x + inNum;
outArry[gid].y = point.y + inNum;

}"""
cl_arrayDemo = cl.Program(ctx, cl_arrayDemo).build()

def arrayDemo(points, inNumber):
  """
  Demostraits how to pass an array of points into and outof ocl.
  """
  import numpy as np
  read = cl.mem_flags.READ_ONLY | cl.mem_flags.COPY_HOST_PTR
  write = cl.mem_flags.WRITE_ONLY | cl.mem_flags.COPY_HOST_PTR
  
  inArray = np.array(points).astype(np.int32)
  outArray = np.empty_like(inArray)
  
  inArrayBuffer = cl.Buffer(ctx, read, hostbuf=inArray)
  inNumberBuffer = np.uint32(inNumber)
  outArrayBuffer = cl.Buffer(ctx, write, hostbuf=outArray)
  
  args = inArrayBuffer, inNumberBuffer, outArrayBuffer
  
  cl_arrayDemo.main(queue, outArray.shape, None, *args).wait()
  cl.enqueue_copy(queue, outArray, outArrayBuffer)
  return outArray




def npImage(size):
  """
  Returns an empty square numpy array representing a 0-256 image with alpha.
  """
  return np.empty( (size, size, 4), dtype = np.uint8)




arry = npImage(4)
arry = imageDemo(arry, 100)
print(arry)

points = [(1, 2), (3, 4), (5, 6)]
points = arrayDemo(points, 200)
print(points)

@wkk was teaching me how to use the asyncio library. It looks very promising. I also tried using the multithreading library and it seemed to mostly work. I haven’t tried either in an actual game, tho.
It seams like alot of the prototypes, templates, and tutorials people make don’t work well in production. Part of why I made this thread is because pyOpenCl and popen are two methods that I have some working experience with. I have a decent understanding of how they work and when they don’t.
Regardless, if you are having trouble with threading I would recommend tying out asyncio.

Another way of working with threading is to use Python’s executors:

from concurrent.futures import ThreadPoolExecutor

thread_worker = ThreadPoolExecutor(max_worker=1)

def on_bge_exit():
    '''
    Register this callback somehow to be triggered on game exit.
    '''
    thread_worker.shutdown(wait=True)

Then all you have to do is:

import time

def work():
    time.sleep(10) # seconds
    return 5

def when_done(value):
    print(value)

result = thread_worker.submit(work)
result.set_done_callback(when_done)

This will print 5 after some time, without blocking the game.

Anyway, you still have to be careful with this, as the thread won’t stop immediately, and if there is X seconds left to wait, you will have to wait for it to finish before being able to correctly shutdown.

i use the Multiprocess module, here is a blend to see how i have set it up.

th-test.blend

here is the part that init the Process. (same as in blend)

from multiprocessing import Process, Queue
import time
import random
import bge

cont = bge.logic.getCurrentController()
own = cont.owner

def myproc(cmd=None):
  
    if "path" in cmd:
        return cmd
        
    if "stop" in cmd:
        return False
    
    return "running"

class myproc2:
    
    def __init__(self,data=None):
        self.data = data
        self.tick = 0

    def __getitem__(self, key):
        return getattr(self, key)
    
    def __setitem__(self, key, value):
        setattr(self, key, value)
        
    def main(self,cmd=None):
        
        if cmd:
            self.tick += 1
            if "update" in cmd:
                if self.data:
                    self.data["tick"] = self.tick
                    return [self.data,cmd["update"]]
                return cmd["update"]
            
        return None
    
class Multiprocess(Process):
    def __init__(self,proc=None,name=None):
        Process.__init__(self)
        self.daemon = False
        self.outq = Queue()
        self.inq = Queue()
        
        if name:
            self.name = name
            
        if proc:
            self.ex = proc
            self.start()
        else:
            print("ERROR: nothing to run!!")
        
    
    def run(self):
        if "__main__" in str(type(self.ex)) :

            while True:
                item = self.inq.get()
                if item:
                    if "stop" in item:
                        print("stop",self.name)
                        break
                    else:
                        out = self.ex.main(item)
                        self.outq.put(out)
        else:
            while True:
                item = self.inq.get()
                if item:
                    if "stop" in item:
                        print("stop",self.name)
                        break
                    else:
                        out = self.ex(item)
                        self.outq.put(out)
                        
        return
 
    
    def put(self,cmd=None):
        if cmd:
            self.inq.put(cmd)
                
    def get(self,flag=False):
        try:
            out = self.outq.get(flag)
        except:
            out = None
        return out

    def empty(self):
        return self.inq.empty()
    

own["test1"] = Multiprocess(myproc2({"Name":"test1"}),name="test1")
own["test2"] = Multiprocess(myproc,name="test2")