How to use realtime audio input with the bge?

I wanted to know if anyone here has done this and/or can help me out? I know it is possible since I have seen responsive demos in which audio waveforms from the microphone can be displayed visually in the game engine. I can’t really find any tutorials or anything that can help me out. Also tried installing pyaudio with Blender’s python folders with no luck. I’m using plain Blender 2.79.

It’s definitely possible, but I fell short of figuring out how to calculate a single line that represented the sound wave. Displaying the wave is the only “Blender” part of this challenge, so you’ll probably have better luck looking at PyAudio forums or StackOverflow for learning how to record and process it.

As for installing PyAudio, I ended up installing it via pip on an installation of Python that matched Blenders version.

1 Like


Above - me saying the word “testing”

OK so I figured it out, kind of?? Here’s a proof of concept, I adapted this script for the bge. Going to paste my altered version of the script below. Basically it is…slow, so I wound up lowering the audio input quality considerably.

I wanna use this to try swapping out character mouth textures with the measured amplitude of my voice, kinda like live2d. A super rudimentary bge version.

import bge;
from bge import render,logic

render.showProperties(True);
cont=logic.getCurrentController();
own=cont.owner;
if not'fc'in own:
	own['fc']=0;
	own.addDebugProperty("fc",True);
else:
	own['fc']+=1;

import pyaudio
import numpy as np

def audtest():
	CHUNK = 1 # number of data points to read at a time
	RATE = 16000  # time resolution of the recording device (Hz)
	p=pyaudio.PyAudio() # start the PyAudio class
	stream=p.open(format=pyaudio.paInt16,channels=1,rate=RATE,input=True,frames_per_buffer=CHUNK) #uses default input device
	# create a numpy array holding a single read of audio data
	data = np.fromstring(stream.read(CHUNK),dtype=np.int16)
	if -50<data<50:
		print("````````````X");
	elif -100<data<-50 or 50<data<100:
		print("```````````XXX");
	elif -150<data<-100 or 100<data<150:
		print("``````````XXXXX");
	elif -200<data<-150 or 150<data<200:
		print("`````````XXXXXXX");
	elif -250<data<-200 or 200<data<250:
		print("````````XXXXXXXXX");
	elif -300<data<-250 or 250<data<300:
		print("```````XXXXXXXXXXX");
	elif -350<data<-300 or 300<data<350:
		print("``````XXXXXXXXXXXXX");
	elif -400<data<-350 or 350<data<400:
		print("`````XXXXXXXXXXXXXXX");
	elif -450<data<-400 or 400<data<450:
		print("````XXXXXXXXXXXXXXXXX");
	elif -500<data<-450 or 450<data<500:
		print("```XXXXXXXXXXXXXXXXXXX");
	elif -550<data<-500 or 500<data<550:
		print("``XXXXXXXXXXXXXXXXXXXXX");
	elif -600<data<-550 or 550<data<600:
		print("`XXXXXXXXXXXXXXXXXXXXXXX");
	elif data<-600 or 600<data:
		print("XXXXXXXXXXXXXXXXXXXXXXXXX");


# experiment - do every even frame


if str(own['fc']).endswith('0')\
or str(own['fc']).endswith('2')\
or str(own['fc']).endswith('4')\
or str(own['fc']).endswith('6')\
or str(own['fc']).endswith('8'):
	audtest();
1 Like

That’s awesome! Nice work and thanks for sharing your script

1 Like

Ok so this kind of works, but there’s a catch. It slows down the game considerably. I think this is probably because the only way to get this result is by having it run every frame, which means it has to .open() every frame. Is there a way of having it open the PyAudio module only on the first frame of the runtime and having it run concurrently? Or just, in general, faster?

You may try using threading to run PyAudio on a separate thread, I already did that before when I needed to listen to MIDI events with a while loop. Just make sure to kill the thread before exiting game, as it will still run even after the engine has ended. Disabling the exit key, setting a key to kill the thread and exit the game manually should do the job.

Trying to figure this out with no luck. Do you have a very simple example of how to implement threading in the bge? My goal is to open it once and run the thread concurrently with the game and feed the audio information to properties, and then close it when the game quits.
I tried using this but, no luck so far.
Also here’s what I tried

import pyaudio
import numpy as np
import time
import threading

p=pyaudio.PyAudio() # start the PyAudio class
CHUNK = 4096 # number of data points to read at a time
RATE = 44100 # time resolution of the recording device (Hz)

stream=p.open(format=pyaudio.paInt16,channels=1,rate=RATE,input=True,frames_per_buffer=CHUNK) #uses default input device

def to_run(): # should not take arguments.
    data = np.fromstring(stream.read(CHUNK),dtype=np.int16)
    print(data)

new_thread = threading.Thread() # create a new thread object.
new_thread.run = to_run
new_thread.start() # the new thread is created and then is running.

Example of making a function run on another thread in BGE (see the console for the output of the function running in another thread):

ex_threading.blend (422.4 KB)

import bge
import threading

from time import time
from bge.logic import globalDict

startTime = time()

# Disables default exit key
bge.logic.setExitKey(0)

# Audio detection function that will run in another thread.
# You can even run while loops here without freezing the engine.
def runAudioDetection():
    while globalDict["Running"]:
       print("> Time elapsed:", time() - startTime)

def main(cont):
    always = cont.sensors[0]
    
    # When Esc key is just pressed
    exitKey = bge.logic.keyboard.events[bge.events.ESCKEY] == 1
    
    if always.positive:
        
        # Create and run thread at start
        if not 'AudioThread' in globalDict.keys():
            
            # Sinalizes to runAudioDetection that its while loop can run
            globalDict["Running"] = True
            
            # Create and run thread object with the given function
            globalDict["AudioThread"] = threading.Thread(target=runAudioDetection)
            globalDict["AudioThread"].setDaemon(True)
            globalDict["AudioThread"].start()
        
        # Finish thread and end game
        if exitKey:
            
            # Sinalizes to runAudioDetection to stop running while loop 
            globalDict['Running'] = False
            
            # Finish thread
            globalDict["AudioThread"].join()
            del globalDict["AudioThread"]
            print('> Thread joined!')
            
            # End game
            bge.logic.endGame()

I’d suggest increasing the chunk size. I use 1024 in my scripts (yours is currently set to 1) and it runs fine for me without threading.

You only need to open once, when you want to start recording. Every frame you only need to call read(chunk) on the stream

thank you so much. this is working almost perfectly for me so far!

1 Like

if anyone comes here looking for the same thing I was looking for, here’s the script so far. It’s a little rough and needs some tweaking but this seems to be working for me:

import bge
import threading
import pyaudio
import numpy as np

from time import time
from bge.logic import globalDict



cont=bge.logic.getCurrentController();
own=cont.owner;
if not'mic_start'in own:
	own['mic']="0"
	own['talking']=False;
	own.addDebugProperty("mic");
	own.addDebugProperty("talking");
    own['mic_start']=True;

if not'fc'in own:      # frame counter
    own['fc']=0;
    own.addDebugProperty("fc");
own['fc']+=1;


p=pyaudio.PyAudio() # start the PyAudio class
CHUNK = 64 # number of data points to read at a time
RATE = 44100 # time resolution of the recording device (Hz)

startTime = time()

# Disables default exit key
bge.logic.setExitKey(0)

def runAudioDetection():
	stream=p.open(format=pyaudio.paInt16,channels=1,rate=RATE,input=True,frames_per_buffer=CHUNK,) #uses default input device
	while globalDict["Running"]:
		datalist = np.fromstring(stream.read(CHUNK,exception_on_overflow=False),dtype=np.int16,)
		abslist=[];
		for data in datalist:
			abslist.append(abs(data));
		data_avg=int(sum(abslist)/len(abslist));
		
		a=data_avg/8;

		if 					0			<a<=	5:
			print("```````````````````");
		elif				5			<a<=	10:
			print("```````````````````X");
		elif				10		<a<=		15:
			print("``````````````````XX");
		elif				15		<a<=		20:
			print("``````````````````XXX");
		elif				20		<a<=		25:
			print("`````````````````XXXX");
		elif				25		<a<=		30:
			print("`````````````````XXXXX");
		elif				30		<a<=		35:
			print("````````````````XXXXXX");
		elif				35		<a<=		40:
			print("````````````````XXXXXXX");
		elif				40		<a<=		45:
			print("```````````````XXXXXXXX");
		elif				45		<a<=		50:
			print("```````````````XXXXXXXXX");
		elif				50		<a<=		55:
			print("``````````````XXXXXXXXXX");
		elif				55		<a<=		60:
			print("``````````````XXXXXXXXXXX");
		elif				60		<a<=		65:
			print("`````````````XXXXXXXXXXXX");
		elif				65		<a<=		70:
			print("`````````````XXXXXXXXXXXXX");
		elif				70		<a<=		75:
			print("````````````XXXXXXXXXXXXXX");
		elif				75		<a<=		80:
			print("````````````XXXXXXXXXXXXXXX");
		elif				80		<a<=		85:
			print("```````````XXXXXXXXXXXXXXXX");
		elif				85		<a<=		90:
			print("```````````XXXXXXXXXXXXXXXXX");
		elif				95		<a<=		100:
			print("``````````XXXXXXXXXXXXXXXXXX");
		elif				100		<a<=		105:
			print("``````````XXXXXXXXXXXXXXXXXXX");
		elif				105		<a<=		110:
			print("`````````XXXXXXXXXXXXXXXXXXXX");
		elif				110		<a<=		115:
			print("`````````XXXXXXXXXXXXXXXXXXXXX");
		elif				115		<a<=		120:
			print("````````XXXXXXXXXXXXXXXXXXXXXX");
		elif				120		<a<=		125:
			print("````````XXXXXXXXXXXXXXXXXXXXXXX");
		elif				125		<a<=		130:
			print("```````XXXXXXXXXXXXXXXXXXXXXXXX");
		elif				130		<a<=		135:
			print("```````XXXXXXXXXXXXXXXXXXXXXXXXX");
		elif				135		<a<=		140:
			print("``````XXXXXXXXXXXXXXXXXXXXXXXXXX");
		elif				140		<a<=		145:
			print("``````XXXXXXXXXXXXXXXXXXXXXXXXXXX");
		elif				145		<a<=		150:
			print("`````XXXXXXXXXXXXXXXXXXXXXXXXXXXX");
		elif				150		<a:
			print("`````XXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
		
		
		own['mic']=int(data_avg);
		
                #you can set a "threshold" for when you
                #want to activate something to happen
                #depending on how loud the input is
        if a>45:
			own['talking']=True;
		else:
			own['talking']=False;

               #Just to make sure things don't slow down, I close and restart
               #the audio stream once every second at 60fps, which doesn't seem
               #to be noticeable (so far)
		if own['fc']%60==0:
			stream.stop_stream()
			stream.close()

def audmain(cont):
		always = cont.sensors[0]
		
		# When Esc key is just pressed
		exitKey = bge.logic.keyboard.events[bge.events.ESCKEY] == 1
		
		if always.positive:
				
				# Create and run thread at start
				if not 'AudioThread' in globalDict.keys():
						
						# Sinalizes to runAudioDetection that its while loop can run
						globalDict["Running"] = True
						
						# Create and run thread object with the given function
						globalDict["AudioThread"] = threading.Thread(target=runAudioDetection)
						globalDict["AudioThread"].setDaemon(True)
						globalDict["AudioThread"].start()
				
				# Finish thread and end game
				if exitKey:
						
						# Sinalizes to runAudioDetection to stop running while loop 
						globalDict['Running'] = False
						
						# Finish thread
						globalDict["AudioThread"].join()
						del globalDict["AudioThread"]
						print('<-------------------------------------')
						
						# End game
						bge.logic.endGame()
						
				#experimental - restart once every 1 seconds
				if'fc'in own:
					if own['fc']%60==0:
						# Sinalizes to runAudioDetection to stop running while loop 
						globalDict['Running'] = False
						
						# Finish thread
						globalDict["AudioThread"].join()
						del globalDict["AudioThread"]
						print('<-------------------------------------')

should give you something like this: