Eventually I did try something like this, which is much more simplified, compared to threading and synchronization of data among threads etc…
Provided that you can limit blocking time somehow to less than 5ms you would not experience anything negative in terms of lagging.
The the real point is to consider that you must not nag the data receiver continuously with requests, since you get more blocking that way. So you can split the the receiving part from the updating part and kinda get away with it.
Since threading is quite unstable, it might not be the best choice. Though there are lots of workarounds (queue, multiprocess) perhaps at some point another technique can be used, according to requirements and other use cases.
import bpy
import time
import random
class DataReceive:
def __init__(self):
self.flip = 1
def receive(self):
self.flip *= -1
return self.flip * random.randint(5, 10)
class Interpolate:
def __init__(self):
self.this = 0.0
self.new = 0.0
def lerp(self, a, b, blend):
return (blend * (b - a)) + a
def update(self):
self.this = self.lerp(self.this, self.new, 0.2)
class ModalTimerOperator(bpy.types.Operator):
bl_idname = "wm.modal_timer_operator"
bl_label = "Modal Timer Operator"
_timer = None
def execute(self, context):
print('execute')
# validate
if 'Cube' not in context.scene.objects:
print('not found Cube')
return {'CANCELLED'}
self.object = context.scene.objects['Cube']
# prepare modal
wm = context.window_manager
self._timer = wm.event_timer_add(0.01, window=context.window)
wm.modal_handler_add(self)
# to receive data
self.data = DataReceive()
# to interpolate
self.interp = Interpolate()
# to receive data periodically
self.prevtime = time.time()
return {'RUNNING_MODAL'}
def modal(self, context, event):
# cancel with ECS
if event.type == 'ESC':
self.cancel(context)
print('cancelled')
return {'CANCELLED'}
# timer update
if event.type == 'TIMER':
# receive each second
if time.time() - self.prevtime > 1.0:
self.prevtime = time.time()
self.interp.new = self.data.receive()
print('new value', self.interp.new)
# update position
self.interp.update()
self.object.location.x = self.interp.this
# return pass through
return {'PASS_THROUGH'}
def cancel(self, context):
wm = context.window_manager
wm.event_timer_remove(self._timer)
def register():
bpy.utils.register_class(ModalTimerOperator)
def unregister():
bpy.utils.unregister_class(ModalTimerOperator)
if __name__ == "__main__":
try: unregister()
except: pass
register()
bpy.ops.wm.modal_timer_operator()