#!/usr/bin/python3 import gi gi.require_version('Gtk', '3.0') from gi.repository import Gtk, GLib, Gdk import os from os.path import expanduser import getpass import pwd import re import resource import shutil import shlex import signal import subprocess import glob import dbus from dbus.mainloop.glib import DBusGMainLoop import time import jack import configparser DBusGMainLoop(set_as_default=True) class SysInfo: """Get information about the system""" # get info about if rtaccess is setup right def user_audio(self): """Checks if current user is in the audio group, or not""" # create a list of users who are members of audio group: with open("/etc/group", "r") as groups_file: for line in groups_file: if re.match("^audio:", line): audio_users = line.split(':')[3].rstrip().split(',') user = getpass.getuser() audio_group = False if user in audio_users: audio_group = True return audio_group def check_pam_files(self): '''Checks for the existence of two files''' jack_file_exists = False if os.path.isfile("/etc/security/limits.d/audio.conf"): jack_file_exists = True return jack_file_exists def check_rlimits(self): """returns hard rlimit values for RTPRIO and MEMLOCK""" return resource.getrlimit(resource.RLIMIT_RTPRIO)[1], resource.getrlimit(resource.RLIMIT_MEMLOCK)[1] # System tweaks def get_performance(self): '''Checks for current cpu governor''' in_performance = False if os.path.isfile("/sys/devices/system/cpu/cpufreq/policy0/scaling_governor"): with open("/sys/devices/system/cpu/cpufreq/policy0/scaling_governor", "r") as perform_file_test: for line in perform_file_test: if re.match("performance", line.rstrip()): in_performance = True return in_performance def get_boosted(self): '''Checks for Intel boost state''' boosted = False boost_path = "/sys/devices/system/cpu/intel_pstate/no_turbo" if os.path.exists(boost_path): with open(boost_path, "r") as boost_test: for line in boost_test: if re.match("0", line.rstrip()): boosted = True return boosted # Audio stuff class RTSetup: # defs for doing things def __init__(self): self.enabled_path = "/etc/security/limits.d/audio.conf" self.disabled_path = "/etc/security/limits.d/audio.conf.disabled" self.backup_file = "/usr/share/studio-controls/audio.conf" def set_governor(self, enable): if enable == True: gov = "performance" else: if os.path.exists("/sys/devices/system/cpu/intel_pstate"): gov = "powersave" else: gov = "ondemand" subprocess.run(["/usr/bin/pkexec", "/usr/sbin/studio-system", gov], shell=False) def set_boost(self, enable): boost_path = "/sys/devices/system/cpu/intel_pstate/no_turbo" if os.path.exists(boost_path): if enable == True: subprocess.run(["/usr/bin/pkexec", "/usr/sbin/studio-system", "boost"], shell=False) else: subprocess.run(["/usr/bin/pkexec", "/usr/sbin/studio-system", "noboost"], shell=False) class StudioControls: global lock_file config = configparser.ConfigParser() def_config = config['DEFAULT'] #tab_conf = config['TABLET'] config_path = "~/.config/autojack" old_config_file = "~/.config/autojackrc" def __init__(self): '''Activate the SysInfo class''' # this is a long chunk of code that initializes every thing # it should probably be split into tabs at least global lock_file self.sysinfo = SysInfo() c_dir = expanduser(self.config_path) if not os.path.isdir(c_dir): os.makedirs(c_dir) self.config_file = expanduser(f"{self.config_path}/autojackrc") lock_file = expanduser(f"{self.config_path}/studio-controls.lock") new_pid = str(os.getpid()) if os.path.isfile(lock_file): with open(lock_file, "r") as lk_file: for line in lk_file: # only need one line old_pid = line.rstrip() if new_pid != old_pid: try: os.kill(int(old_pid), 9) except: print("") time.sleep(1) with open(lock_file, "w") as lk_file: lk_file.write(new_pid) signal.signal(signal.SIGHUP, self.sig_handler) signal.signal(signal.SIGINT, self.sig_handler) signal.signal(signal.SIGQUIT, self.sig_handler) signal.signal(signal.SIGILL, self.sig_handler) signal.signal(signal.SIGTRAP, self.sig_handler) signal.signal(signal.SIGABRT, self.sig_handler) signal.signal(signal.SIGBUS, self.sig_handler) signal.signal(signal.SIGFPE, self.sig_handler) #signal.signal(signal.SIGKILL, self.sig_handler) signal.signal(signal.SIGUSR1, self.sig_handler) signal.signal(signal.SIGSEGV, self.sig_handler) signal.signal(signal.SIGUSR2, self.sig_handler) signal.signal(signal.SIGPIPE, self.sig_handler) signal.signal(signal.SIGALRM, self.sig_handler) signal.signal(signal.SIGTERM, self.sig_handler) '''Create the GUI''' builder = Gtk.Builder() builder.add_from_file("/usr/share/studio-controls/studio-controls.glade") '''Get windows''' self.window_main = builder.get_object('window_main') self.window_help = builder.get_object('window_help') self.message_dialog_changes_info = builder.get_object('message_dialog_changes_info') self.message_dialog_rt_info = builder.get_object('message_dialog_rt_info') self.message_dialog_changes_info.set_transient_for(self.window_main) self.message_dialog_rt_info.set_transient_for(self.window_main) self.button_msg_ok = builder.get_object('button_msg_ok') '''Get buttons for system tab''' self.rt_button = builder.get_object('rt_button') self.rt_warning = builder.get_object('rt_warning') self.combo_governor = builder.get_object('combo_governor') self.combo_boost = builder.get_object('combo_boost') self.logging_comb = builder.get_object('logging_comb') self.combo_fw = builder.get_object('combo_fw') '''audio tab stuff''' '''master tab''' self.jack_device_combo = builder.get_object('jack_device_combo') self.jack_usb_dev_combo = builder.get_object('jack_usb_dev_combo') self.chan_in_spin = builder.get_object('chan_in_spin') self.chan_out_spin = builder.get_object('chan_out_spin') self.jack_rate_combo = builder.get_object('jack_rate_combo') self.combobox_late = builder.get_object('combobox_late') self.combo_periods = builder.get_object('combo_periods') self.combo_backend = builder.get_object('combo_backend') self.jack_midi_check = builder.get_object('jack_midi_check') self.jack_ind = builder.get_object('jack_ind') self.jack_state = builder.get_object('jack_state') self.dsp_label = builder.get_object('dsp_label') self.xrun_lab = builder.get_object('xrun_lab') '''extra tab''' self.usb_plug_check = builder.get_object('usb_plug_check') self.usb_single_ck = builder.get_object('usb_single_ck') self.combo_zita_add = builder.get_object('combo_zita_add') self.combo_zita_remove = builder.get_object('combo_zita_remove') self.blacklist_add = builder.get_object('blacklist_add') self.blacklist_remove = builder.get_object('blacklist_remove') self.hp_action = builder.get_object('hp_action') self.hp_device = builder.get_object('hp_device') self.monitor_combo = builder.get_object('monitor_combo') self.hp_switch = builder.get_object('hp_switch') '''pulse tab''' self.pj_in_combo = builder.get_object('pj_in_combo') self.pj_out_combo = builder.get_object('pj_out_combo') self.pj_in_name = builder.get_object('pj_in_name') self.pj_out_name = builder.get_object('pj_out_name') self.pj_in_con = builder.get_object('pj_in_con') self.pj_out_con = builder.get_object('pj_out_con') '''Dbus monitoring''' user_bus = dbus.SessionBus() user_bus.add_signal_receiver(self.db_ses_cb, dbus_interface='org.studio.control.event', signal_name='V3_4_signal') user_bus.add_signal_receiver(self.db_new_usb, dbus_interface='org.studio.control.event', signal_name='usb_signal') '''Check if audio.conf and/or audio.conf.disabled exists, returns are true or false''' self.rt_file = False self.jack_file_exists = self.sysinfo.check_pam_files() if self.jack_file_exists and self.sysinfo.user_audio(): rtprio, memlock = self.sysinfo.check_rlimits() if rtprio == 0: self.rt_button.set_label("Logout required") self.rt_button.set_sensitive(False) self.message_dialog_rt_info.show() self.rt_warning.set_text("Session restart required for Real Time Permissions") else: # turn off warning text, check on, deactivate self.rt_warning.set_text("") self.rt_button.set_label("Real Time Permissions Enabled") self.rt_button.set_sensitive(False) # show current CPU Governor self.combo_governor.append_text("Performance") if os.path.exists("/sys/devices/system/cpu/intel_pstate/"): self.combo_governor.append_text("Powersave") else: self.combo_governor.append_text("Ondemand") self.in_performance = self.sysinfo.get_performance() if self.in_performance: self.combo_governor.set_active(0) else: self.combo_governor.set_active(1) # show boost state if os.path.exists("/sys/devices/system/cpu/intel_pstate/no_turbo"): self.boosted = self.sysinfo.get_boosted() if self.boosted: self.combo_boost.set_active(1) else: self.combo_boost.set_active(0) else: self.combo_boost.set_sensitive(False) if os.path.exists("/etc/modprobe.d/blacklist-studio.conf"): self.combo_fw.set_active_id("ffado") self.combo_backend.append("firewire", "firewire") ''' combo_fw and combo_backend ''' # Audio stuff default_device = "0,0,0" for adir in glob.glob("/proc/asound/card*/codec*"): idfile = adir.rsplit('/', 1)[0] with open(f"{idfile}/id", "r") as card_file: for line in card_file: # only need one line cname = line.rstrip() if cname != "HDMI" and cname != "NVidia": default_device = f"{cname},0,0" # first set defaults self.config['DEFAULT'] = { 'JACK': 'False', 'DRIVER': "alsa", 'CHAN-IN': '0', 'CHAN-OUT': '0', 'RATE': "48000", 'FRAME': "1024", 'PERIOD': "2", 'ZFRAME': "512", 'XDEV': "", 'PULSE-IN': 'pulse_in', 'PULSE-OUT': 'pulse_out', 'PJ-IN-CON': 'system:capture_1', 'PJ-OUT-CON': 'system:playback_1', 'A2J': "True", 'DEV': default_device, 'USBAUTO': "True", 'USB-SINGLE': "False", 'USBDEV': "", 'LOG-LEVEL': "19", 'BLACKLIST': "", 'PHONE-ACTION': "switch", 'PHONE-DEVICE': default_device, 'MONITOR': 'system:playback_1' } global autojack global newusb global jack_alive global jack_died global xrun_count global jack_ports_changed global jack_time jack_time = 3 xrun_count = 0 autojack = True newusb = False jack_alive = False jack_died = False jack_ports_changed = True self.jack_error_mesg = "" self.jack_info_mesg = "" self.dirty = False jack.set_error_function(callback=self.jack_error) jack.set_info_function(callback=self.jack_info) # read in autojack config file c_file = expanduser(self.config_file) if os.path.isfile(c_file): self.config.read(c_file) if self.def_config['MONITOR'] == "none": self.def_config['MONITOR'] = 'system:playback_1' if self.def_config['PJ-IN-CON'] == "0": self.def_config['PJ-IN-CON'] = "none" if self.def_config['PJ-OUT-CON'] == "0": self.def_config['PJ-OUT-CON'] = "none" self.zdev = self.def_config['XDEV'].strip('"').split() self.pulse_in = self.def_config['PULSE-IN'].strip('"').split() self.pulse_out = self.def_config['PULSE-OUT'].strip('"').split() self.p_in_con = self.def_config['PJ-IN-CON'].strip('"').split() self.p_out_con = self.def_config['PJ-OUT-CON'].strip('"').split() self.blacklist = self.def_config['BLACKLIST'].strip('"').split() # Find out if jack is running try: client = jack.Client('controls', False, True) self.def_config['JACK'] = "True" client.close() except jack.JackError: self.def_config['JACK'] = "False" self.logging_comb.set_active_id(self.def_config['LOG-LEVEL']) # fill in Jack master widgets self.jack_rate_combo.set_active_id(self.def_config['RATE']) self.combobox_late.set_active_id(self.def_config['FRAME']) self.combo_periods.set_active_id(self.def_config['PERIOD']) self.combo_backend.set_active_id(self.def_config['DRIVER']) self.chan_in_spin.set_range(0, 128) self.chan_out_spin.set_range(0, 128) self.chan_in_spin.set_value(self.def_config.getint('CHAN-IN')) self.chan_out_spin.set_value(self.def_config.getint('CHAN-OUT')) self.jack_midi_check.set_active(self.def_config['A2J'] == "True") # Fill Extra devices widgets self.usb_plug_check.set_active(self.def_config['USBAUTO'] == "True") if self.def_config['USBAUTO'] == "True": self.usb_single_ck.set_sensitive(True) self.usb_single_ck.set_active(self.def_config['USB-SINGLE'] == "True") else: self.usb_single_ck.set_sensitive(False) self.usb_single_ck.set_active(self.def_config['USB-SINGLE'] == "True") self.refresh_dropdowns() self.i_in = 0 self.i_out = 0 self.refresh_pulse_tab(self.i_in, self.i_out) handlers = { "on_window_main_delete_event": self.on_window_main_delete_event, "on_window_help_delete_event": self.on_window_help_delete_event, "on_main_button_cancel_clicked": self.on_main_button_cancel_clicked, "on_main_button_help_clicked": self.on_main_button_help_clicked, "combo_governor_changed_cb": self.combo_governor_changed_cb, "combo_boost_changed_cb": self.combo_boost_changed_cb, "logging_change": self.logging_changed, "firewire_cb": self.firewire_cb, "rt_button_hit": self.rt_button_hit, "on_button_msg_ok_clicked": self.on_button_msg_ok_clicked, "on_button_rt_info_ok_clicked": self.on_button_rt_info_ok_clicked, "on_button_help_ok_clicked": self.on_button_help_ok_clicked, "jack_device_changed": self.jack_device_changed, "usb_master_changed": self.usb_master_changed, "jack_driver_changed": self.jack_driver_changed, "xrun_reset": self.xrun_reset, "cb_jack_start": self.cb_jack_start, "cb_jack_stop": self.cb_jack_stop, "cb_audio_apply": self.cb_audio_apply, "mixer_cb": self.mixer_cb, "pavucontrol_cb": self.pavucontrol_cb, "carla_cb": self.carla_cb, "cb_zita_add": self.cb_zita_add, "cb_zita_remove": self.cb_zita_remove, "cb_blacklist_add": self.cb_blacklist_add, "cb_blacklist_remove": self.cb_blacklist_remove, "usb_plug_cb": self.usb_plug_cb, "hp_action_cb": self.hp_action_cb, "hp_switch_cb": self.hp_switch_cb, "pj_in_combo_cb": self.pj_in_combo_cb, "pj_out_combo_cb": self.pj_out_combo_cb, "pj_in_add_cb": self.pj_in_add_cb, "pj_out_add_cb": self.pj_out_add_cb, "pj_in_rem_cb": self.pj_in_rem_cb, "pj_out_rem_cb": self.pj_out_rem_cb, "pj_in_name_cb": self.pj_in_name_cb, "pj_out_name_cb": self.pj_out_name_cb, } builder.connect_signals(handlers) self.rtsetup = RTSetup() self.timeout_id = GLib.timeout_add(500, self.check_jack_status, None) self.signal_autojack("ping") autojack = False def jack_error(self, mesg): if self.def_config['JACK'] == "True": if "not running" in mesg: print(f"jack message received: {mesg}") def jack_info(self, mesg): print(f"jack_info received: {mesg}") def db_ses_cb(*args, **kwargs): ''' this means we got a responce from autojack, Good''' global autojack autojack = True print("autojack is running") def db_new_usb(*args, **kwargs): ''' Autojack tells us a USB audio device has been plugged in or unplugged. ''' global newusb newusb = True print("autojack sees usb change") def check_jack_status(self, user_data): '''Check if jack has died and the client needs to be closed. Check if jack is running then set jack status indicator. Check to see if the device lists have changed and update the gui if so. Updating GUI prevents race with secondary updates caused by updating''' # these variables need to be global as they are used by callbacks global newusb global jack_client global jack_alive global jack_died global xrun_count global jack_ports_changed global jack_time # sent by jackdied() callback if jack_died: jack_client.deactivate() jack_client.close() self.refresh_pulse_io() self.dirty = True jack_died = False jack_alive = False # device changed, update GUI if self.dirty or newusb: self.refresh_dropdowns() self.dirty = False newusb = False if jack_ports_changed: jack_ports_changed = False self.refresh_pulse_io() if jack_alive: load = jack_client.cpu_load() self.dsp_label.set_text(f"DSP: {str(load)[0:4]}%") self.xrun_lab.set_text(f" {str(xrun_count)}") else: if jack_time > 0: jack_time = jack_time - 1 return True else: jack_time = 4 self.dsp_label.set_text("DSP: 0%") self.xrun_lab.set_text(" 0") xrun_count = 0 try: jack_client = jack.Client('controls', False, True) except jack.JackError: self.jack_state.set_text(" Stopped") self.jack_ind.set_from_icon_name("gtk-stop", 0) self.xrun_lab.set_text(" 0") xrun_count = 0 return True self.jack_state.set_text(" Running") self.jack_ind.set_from_icon_name("gtk-apply", 0) # if jack is shutting down we need to know jack_client.set_shutdown_callback(self.jackdied) # count xruns jack_client.set_xrun_callback(self.jackxrun) # get notified if a new port is created jack_client.set_port_registration_callback(self.new_jack_port) jack_client.activate() jack_alive = True jack_ports_changed = True self.refresh_pulse_tab(self.i_in, self.i_out) return True def jackdied(state, why, extra): '''gets called by jack if it is exiting, we can't clean up here... so tell the world jack died with a flag instead''' global jack_alive global jack_died jack_died = True jack_alive = False def jackxrun(xruntime, extra): ''' Got an xrun, count it ''' global xrun_count xrun_count = xrun_count + 1 def new_jack_port(port_id, new, extra): ''' jack has added a port tell someone ''' global jack_ports_changed jack_ports_changed = True def xrun_reset(self, button): ''' user asked for an xrun reset so do it ''' global xrun_count xrun_count = 0 def refresh_pulse_io(self): ''' the ports that can be connected to pulse ports varies with what jack offers. This refreshes the two drop downs ''' global jack_client global jack_alive self.pj_in_con.set_sensitive(False) self.pj_out_con.set_sensitive(False) self.pj_in_con.get_model().clear() self.monitor_combo.set_sensitive(False) self.monitor_combo.get_model().clear() self.monitor_combo.append(self.def_config["MONITOR"], self.def_config["MONITOR"]) self.monitor_combo.set_active(0) self.pj_in_con.append("none", "no connection") self.pj_in_con.set_active(0) self.pj_out_con.get_model().clear() self.pj_out_con.append("none", "no connection") self.pj_out_con.append("monitor", "Main Output Ports") self.pj_out_con.set_active(0) active_in = self.p_in_con[self.i_in] if (self.p_in_con != []) and (active_in != "0"): self.pj_in_con.append(active_in, active_in) self.pj_in_con.set_active_id(active_in) active_out = self.p_out_con[self.i_out] if (self.p_out_con != []) and (active_out != "0"): if active_out != "monitor": self.pj_out_con.append(active_out, active_out) self.pj_out_con.set_active_id(active_out) if jack_alive: # get capture ports with audio and hardware jack_cap = jack_client.get_ports("", is_audio=True, is_output=True, is_physical=True) jack_play = jack_client.get_ports("", is_audio=True, is_input=True, is_physical=True) left_ch = True last_dev = "" last_port = "" for jport in jack_cap: if left_ch: extra = "" port = jport.name dev = port.split(':', 1)[0] paliases = jport.aliases for palias in paliases: adev = palias.split(':', 1)[0] if adev == "system": if left_ch: extra = f" <{dev}>" dev = adev port = palias if left_ch: last_dev = dev last_port = port left_ch = False else: this_port = last_port if dev == last_dev: right_ch = port.rsplit('_', 1)[1] self.pj_in_con.append(last_port, f"{last_port} and {str(right_ch)}{extra}") left_ch = True else: self.pj_in_con.append(last_port, f"{last_port} (mono){extra}") last_dev = dev last_port = port if this_port == active_in: self.pj_in_con.remove(self.pj_in_con.get_active()) self.pj_in_con.set_active_id(active_in) if not left_ch: self.pj_in_con.append(last_port, f"{last_port} (mono){extra}") if last_port == active_in: self.pj_in_con.remove(self.pj_in_con.get_active()) self.pj_in_con.set_active_id(active_in) left_ch = True last_dev = "" last_port = "" for jport in jack_play: if left_ch: extra = "" port = jport.name dev = port.split(':', 1)[0] paliases = jport.aliases for palias in paliases: adev = palias.split(':', 1)[0] if adev == "system": if left_ch: extra = f" <{dev}>" dev = adev port = palias if left_ch: last_dev = dev last_port = port left_ch = False else: this_port = last_port if dev == last_dev: right_ch = port.rsplit('_', 1)[1] self.pj_out_con.append(last_port, f"{last_port} and {str(right_ch)}{extra}") self.monitor_combo.append(last_port, f"{last_port} and {str(right_ch)}{extra}") left_ch = True else: self.pj_out_con.append(last_port, f"{last_port} (mono){extra}") self.monitor_combo.append(last_port, f"{last_port} (mono){extra}") last_dev = dev last_port = port if this_port == active_out: self.pj_out_con.remove(self.pj_out_con.get_active()) self.pj_out_con.set_active_id(active_out) if this_port == self.def_config["MONITOR"]: self.monitor_combo.remove(self.monitor_combo.get_active()) self.monitor_combo.set_active_id(self.def_config["MONITOR"]) left_ch = True if not left_ch: self.pj_out_con.append(last_port, f"{last_port} (mono){extra}") self.monitor_combo.append(last_port, f"{last_port} (mono){extra}") if last_port == active_out: self.pj_out_con.remove(self.pj_out_con.get_active()) self.pj_out_con.set_active_id(active_out) else: self.pj_in_con.append("none", "Ports can not be displayed unless Jack is running.") self.pj_out_con.append("none", "Ports can not be displayed unless Jack is running.") self.monitor_combo.append("none", "Ports can not be displayed unless Jack is running.") self.pj_in_con.set_sensitive(True) self.pj_out_con.set_sensitive(True) self.monitor_combo.set_sensitive(True) def refresh_dropdowns(self): '''this call refreshes the device lists for all drop downs that use devices. If backend is not "alsa" then the jack master and USB master are set to default and no usb master. However, all all alsa devices will still be available for bridging and as output device.''' if self.def_config['DRIVER'] == "alsa": self.jack_device_combo.set_sensitive(True) self.jack_usb_dev_combo.set_sensitive(True) self.chan_in_spin.set_sensitive(False) self.chan_out_spin.set_sensitive(False) self.combo_periods.set_sensitive(True) elif self.def_config['DRIVER'] == "firewire": self.jack_device_combo.set_sensitive(False) self.jack_usb_dev_combo.set_sensitive(False) self.chan_in_spin.set_sensitive(False) self.chan_out_spin.set_sensitive(False) self.combo_periods.set_sensitive(True) elif self.def_config['DRIVER'] == "dummy": self.jack_device_combo.set_sensitive(False) self.jack_usb_dev_combo.set_sensitive(False) self.chan_in_spin.set_sensitive(True) self.chan_out_spin.set_sensitive(True) self.combo_periods.set_sensitive(False) self.jack_device_combo.get_model().clear() self.jack_usb_dev_combo.get_model().clear() self.jack_usb_dev_combo.append("", "No USB Master") if self.def_config['DRIVER'] == "alsa": if self.def_config['USBDEV'] == "": self.jack_usb_dev_combo.set_active_id("") else: self.jack_usb_dev_combo.append(self.def_config['USBDEV'], self.def_config['USBDEV']) self.jack_usb_dev_combo.set_active_id(self.def_config['USBDEV']) self.jack_device_combo.append("200,0,0", "USB Jack Master") self.jack_device_combo.set_active_id("200,0,0") self.combo_zita_remove.get_model().clear() self.combo_zita_remove.append("label", "Remove (connected)") self.combo_zita_remove.set_active_id("label") self.combo_zita_add.get_model().clear() self.combo_zita_add.append("label", "Add (available)") self.combo_zita_add.set_active_id("label") self.blacklist_remove.get_model().clear() self.blacklist_remove.append("label", "Remove a Device from the Blacklist") self.blacklist_remove.set_active_id("label") for device in self.blacklist: self.blacklist_remove.append(device, device) self.blacklist_add.get_model().clear() self.blacklist_add.append("label", "Add a Device to the Blacklist") self.blacklist_add.set_active_id("label") self.hp_device.get_model().clear() self.hp_device.append(self.def_config['PHONE-DEVICE'], self.def_config['PHONE-DEVICE']) self.hp_device.set_active_id(self.def_config['PHONE-DEVICE']) usb_save = "" pci_save = "" for x in range(0, 10): # card loop cname = "" next_id = "" next_d = "" is_usb = False is_internal = False if os.path.exists(f"/proc/asound/card{str(x)}"): if os.path.isfile(f"/proc/asound/card{str(x)}/id"): with open(f"/proc/asound/card{str(x)}/id", "r") as card_file: for line in card_file: # only need one line cname = line.rstrip() else: cname = str(x) if os.path.exists(f"/proc/asound/card{str(x)}/usbbus"): is_usb = True if os.path.exists(f"/proc/asound/card{str(x)}/codec#0"): is_internal = True for y in range(0, 10): d_type = "" d_desc = "" if os.path.exists(f"/proc/asound/card{str(x)}/pcm{str(y)}p"): d_type = "playback" if os.path.exists(f"/proc/asound/card{str(x)}/pcm{str(y)}c"): if d_type == "": d_type = "capture" else: d_type = f"{d_type} and capture" if d_type != "": for z in range(0, 10): if os.path.exists(f"/proc/asound/card{str(x)}/pcm{str(y)}{d_type[0]}/sub{str(z)}"): with open(f"/proc/asound/card{str(x)}/pcm{str(y)}{d_type[0]}/sub{str(z)}/info", "r") as info_file: for line in info_file: clean_line = line.rstrip() if re.match("^name:", clean_line): line_list = clean_line.split(": ", 1) dname = "" if len(line_list) > 1: dname = line_list[1] next_id = f"{cname},{str(y)},{str(z)}" next_d = f"{next_id} {d_type} ({dname})" # Have everything we need now # this block will fit where ever we know sub-ids and description if not next_id in self.blacklist: self.blacklist_add.append(next_id, next_d) if next_id != self.def_config['PHONE-DEVICE']: self.hp_device.append(next_id, next_d) if is_usb: if usb_save == "": usb_save = next_id if next_id != self.def_config['USBDEV'] and self.def_config['DRIVER'] == "alsa": self.jack_usb_dev_combo.append(next_id, next_d) else: if pci_save == "": pci_save = next_id if self.def_config['DRIVER'] == "alsa": self.jack_device_combo.append(next_id, next_d) if self.def_config['USBDEV'] == "" and self.def_config['DEV'] == next_id: self.jack_device_combo.set_active_id(next_id) self.dev_desc = next_d if self.def_config['USBDEV'] == "" and self.def_config['DEV'] == "none" and is_internal: self.jack_device_combo.set_active_id(next_id) self.dev_desc = next_d if (self.def_config['DEV'] != next_id or self.def_config['DRIVER'] != "alsa") and self.zdev.count(next_id) > 0: self.combo_zita_remove.append(next_id, next_d) else: if self.def_config['DEV'] != next_id or self.def_config['DRIVER'] != "alsa": self.combo_zita_add.append(next_id, next_d) if pci_save == "" and self.def_config['USBDEV'] == "": self.def_config['USBDEV'] = usb_save self.jack_usb_dev_combo.set_active_id(usb_save) self.jack_device_combo.append("200,0,0", "USB device below") self.jack_device_combo.set_active_id("200,0,0") '''Functions for all the gui controls''' def on_window_help_delete_event(self, window, event): self.window_help.hide_on_delete() return True def on_main_button_help_clicked(self, button): self.window_help.show() def rt_button_hit(self, button): subprocess.run(["/usr/bin/pkexec", "/usr/sbin/studio-system", "fix"], shell=False) self.rt_button.set_label("Logout required") self.rt_button.set_sensitive(False) self.message_dialog_rt_info.show() self.rt_warning.set_text("Session restart required for Real Time Permissions") # system tweaks def combo_governor_changed_cb(self, button): if button.get_active_text() == "Performance": self.rtsetup.set_governor(True) else: self.rtsetup.set_governor(False) def combo_boost_changed_cb(self, button): if button.get_active_text() == "on": self.rtsetup.set_boost(True) else: self.rtsetup.set_boost(False) self.boosted = self.sysinfo.get_boosted() if self.boosted: self.combo_boost.set_active(1) else: self.combo_boost.set_active(0) def logging_changed(self, widget): newval = widget.get_active_id() if self.def_config['LOG-LEVEL'] != newval: self.def_config['LOG-LEVEL'] = newval self.cb_audio_apply(widget) def firewire_cb(self, widget): newval = widget.get_active_id() if newval == "alsa" or newval == "ffado": subprocess.run(["/usr/bin/pkexec", "/usr/sbin/studio-system", newval], shell=False) # Audio setup call backs def cb_zita_add(self, button): a_id = str(button.get_active_id()) if a_id != "None" and a_id != "label": self.zdev.append(str(a_id)) if not self.dirty: self.dirty = True def cb_zita_remove(self, button): a_id = str(button.get_active_id()) if a_id != "None" and a_id != "label": self.zdev.remove(str(a_id)) if not self.dirty: self.dirty = True def cb_blacklist_add(self, widget): a_id = str(widget.get_active_id()) if a_id != "None" and a_id != "label": self.blacklist.append(str(a_id)) self.def_config['BLACKLIST'] = ' '.join(self.blacklist) if not self.dirty: self.dirty = True def cb_blacklist_remove(self, widget): a_id = str(widget.get_active_id()) if a_id != "None" and a_id != "label": self.blacklist.remove(str(a_id)) self.def_config['BLACKLIST'] = ' '.join(self.blacklist) if not self.dirty: self.dirty = True def hp_action_cb(self, widget): a_id = str(widget.get_active_id()) if a_id == "script": print("Script is called ~/.config/autojack/phones.sh") # this needs to be shown as a dialog def hp_switch_cb(self, button): self.signal_autojack("phones") def jack_device_changed(self, button): a_id = str(button.get_active_id()) a_desc = str(button.get_active_text()) if a_id != "None" and a_id != "200,0,0": self.def_config['DEV'] = a_id self.dev_desc = a_desc if not self.dirty: self.dirty = True def jack_driver_changed(self, button): a_driver = str(button.get_active_text()) self.def_config['DRIVER'] = a_driver if not self.dirty: self.dirty = True def usb_master_changed(self, button): a_id = str(button.get_active_id()) if a_id != "None": self.def_config['USBDEV'] = a_id if not self.dirty: self.dirty = True def usb_plug_cb(self, widget): a_id = widget.get_active() self.usb_single_ck.set_sensitive(a_id) def refresh_pulse_tab(self, i_in, i_out): ''' Fill in all pulse related widgets ''' '''self.combo_out_ports.set_active_id(self.def_config['PORTS'])''' global jack_ports_changed self.pj_in_combo.set_sensitive(False) self.pj_in_name.set_sensitive(False) self.pj_out_combo.set_sensitive(False) self.pj_out_name.set_sensitive(False) self.pj_in_combo.get_model().clear() for i, pj_name in enumerate(self.pulse_in): self.pj_in_combo.append(str(i), pj_name) if len(self.pulse_in): self.pj_in_combo.set_active(i_in) self.pj_in_name.set_text(self.pulse_in[i_in]) else: self.pj_in_name.set_text("") self.pj_out_combo.get_model().clear() for i, pj_name in enumerate(self.pulse_out): self.pj_out_combo.append(str(i), pj_name) if len(self.pulse_out): self.pj_out_combo.set_active(i_out) self.pj_out_name.set_text(self.pulse_out[i_out]) else: self.pj_out_name.set_text("") self.pj_in_combo.set_sensitive(True) self.pj_in_name.set_sensitive(True) self.pj_out_combo.set_sensitive(True) self.pj_out_name.set_sensitive(True) jack_ports_changed = True def pj_in_name_cb(self, widget): ''' call back for any pulse bridge input name or connect change to current values ''' if not widget.get_sensitive(): return if len(self.pulse_in): self.pulse_in[self.i_in] = "".join(str(self.pj_in_name.get_text()).split()) self.pj_in_name.set_text(self.pulse_in[self.i_in]) self.p_in_con[self.i_in] = str(self.pj_in_con.get_active_id()) self.refresh_pulse_tab(self.i_in, self.i_out) def pj_out_name_cb(self, widget): ''' call back for any pulse bridge output name or connect change to current values ''' if not widget.get_sensitive(): return if len(self.pulse_out): self.pulse_out[self.i_out] = "".join(str(self.pj_out_name.get_text()).split()) self.pj_out_name.set_text(self.pulse_out[self.i_out]) self.p_out_con[self.i_out] = str(self.pj_out_con.get_active_id()) self.refresh_pulse_tab(self.i_in, self.i_out) def pj_in_combo_cb(self, widget): ''' callback to look at different pa bridge. need to save name and connection, then refresh name and connections to match ''' if not widget.get_sensitive(): return if widget.get_active() < 0: return self.i_in = self.pj_in_combo.get_active() self.refresh_pulse_tab(self.i_in, self.i_out) def pj_out_combo_cb(self, widget): ''' callback to look at different pa bridge. need to save name and connection, then refresh name and connections to match ''' if not widget.get_sensitive(): return if widget.get_active() < 0: return self.i_out = self.pj_out_combo.get_active() self.refresh_pulse_tab(self.i_in, self.i_out) def pj_in_add_cb(self, widget): ''' need to create a name for the bridge and assign connect as "none". Before switching to display the new bridge we need to save the current bridge info ''' self.i_in = len(self.pulse_in) self.i_out = 0 if len(self.pulse_out): self.i_out = int(self.pj_out_combo.get_active_id()) self.pulse_in.append(f"pulse_in-{str(self.i_in + 1)}") self.p_in_con.append("none") self.refresh_pulse_tab(self.i_in, self.i_out) def pj_out_add_cb(self, widget): ''' need to create a name for the bridge and assign connect as "none". Before switching to display the new bridge we need to save the current bridge info ''' self.i_out = len(self.pulse_out) self.i_in = 0 if len(self.pulse_in): self.i_in = int(self.pj_in_combo.get_active_id()) self.pulse_out.append(f"pulse_out-{str(self.i_out + 1)}") self.p_out_con.append("none") self.refresh_pulse_tab(self.i_in, self.i_out) def pj_in_rem_cb(self, widget): ''' get index of current bridge remove name from list by index remove connection from list by index ''' index = self.pj_in_combo.get_active() del self.pulse_in[index] del self.p_in_con[index] self.i_in = 0 self.i_out = 0 if len(self.pulse_out): self.i_out = int(self.pj_out_combo.get_active_id()) self.refresh_pulse_tab(self.i_in, self.i_out) def pj_out_rem_cb(self, widget): ''' get index of current bridge remove name from list by index remove connection from list by index ''' index = self.pj_out_combo.get_active() del self.pulse_out[index] del self.p_out_con[index] self.i_out = 0 self.i_in = 0 if len(self.pulse_in): self.i_in = int(self.pj_in_combo.get_active_id()) self.refresh_pulse_tab(self.i_in, self.i_out) def mixer_cb(self, button): '''callback for mixer button. This starts QASMixer with the device set to whatever is jack master''' if self.def_config['USBDEV'] != "": # must be hw:device not hw:device,0,0 mixdevice = self.def_config['USBDEV'].split(',', 1)[0] else: mixdevice = self.def_config['DEV'].split(',', 1)[0] subprocess.Popen(["/usr/bin/qasmixer", "-n", f"--device=hw:{str(mixdevice)}"], shell=False).pid def pavucontrol_cb(self, button): '''callback for pulse control button, opens pavucontrol''' subprocess.Popen(["/usr/bin/pavucontrol"], shell=False).pid def carla_cb(self, button): '''callback for carla button, opens carla''' if os.path.isfile("/usr/bin/carla") and os.access("/usr/bin/carla", os.X_OK): subprocess.Popen(["/usr/bin/carla"], shell=False).pid else: button.set_label("Please Install Carla First") def cb_jack_start(self, button): ''' call back for Jack (re)start button''' global autojack self.def_config['JACK'] = "True" self.config_save() self.signal_autojack("start") def cb_jack_stop(self, button): global autojack self.def_config['JACK'] = "False" self.config_save() self.signal_autojack("stop") def cb_audio_apply(self, button): '''callback for audio tab apply button''' global autojack self.config_save() self.signal_autojack("config") def config_save(self): ''' Write audio setting to ~/.config/autojack/autojackrc''' c_file = expanduser(self.config_file) if not os.path.isfile(c_file): # either first run or old version c_dir = expanduser(self.config_path) if not os.path.isdir(c_dir): os.makedirs(c_dir) if os.path.isfile(expanduser(self.old_config_file)): os.remove(expanduser(self.old_config_file)) with open(expanduser(self.config_file), "w") as rc_file: self.def_config['DRIVER'] = str(self.combo_backend.get_active_id()) self.def_config['CHAN-IN'] = str(int(self.chan_in_spin.get_value())) self.def_config['CHAN-OUT'] = str(int(self.chan_out_spin.get_value())) self.def_config['RATE'] = str(self.jack_rate_combo.get_active_id()) self.def_config['FRAME'] = str(self.combobox_late.get_active_id()) zframe = str(int(int(self.combobox_late.get_active_id()) / 2)) if int(zframe) < 128: zframe = str(64) self.def_config['ZFRAME'] = zframe self.def_config['PERIOD'] = str(self.combo_periods.get_active_id()) self.def_config['A2J'] = str(self.jack_midi_check.get_active()) self.def_config['PULSE-IN'] = ' '.join(self.pulse_in) self.def_config['PULSE-OUT'] = ' '.join(self.pulse_out) self.def_config['PJ-IN-CON'] = ' '.join(self.p_in_con) self.def_config['PJ-OUT-CON'] = ' '.join(self.p_out_con) self.def_config['USBAUTO'] = str(self.usb_plug_check.get_active()) self.def_config['USB-SINGLE'] = str(self.usb_single_ck.get_active()) self.def_config['XDEV'] = ' '.join(self.zdev) self.def_config['PHONE-ACTION'] = str(self.hp_action.get_active_id()) self.def_config['PHONE-DEVICE'] = str(self.hp_device.get_active_id()) self.def_config['MONITOR'] = str(self.monitor_combo.get_active_id()) self.config.write(rc_file) def signal_autojack(self, signal): global autojack cmd = f"/usr/bin/dbus-send --type=signal / org.studio.control.event.{signal}_signal" if autojack: subprocess.run(shlex.split(cmd), shell=False) else: time.sleep(5) if autojack: subprocess.run(shlex.split(cmd), shell=False) else: print("Starting Autojack...") # first tell any old autojack to die subprocess.run(["/usr/bin/dbus-send", "--type=signal", "/", "org.studio.control.event.quit_signal"], shell=False) # do it subprocess.Popen(["/usr/bin/autojack"], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, shell=False).pid def on_button_help_ok_clicked(self, button): self.window_help.hide() def on_button_rt_info_ok_clicked(self, button): self.message_dialog_rt_info.hide() ''' All the ways we can die ''' def on_button_msg_ok_clicked(self, button): self.we_die() def on_main_button_cancel_clicked(self, button): self.we_die() def on_window_main_delete_event(self, *args): self.we_die() def sig_handler(self, signum, frame): ''' a handler for system signals that may be sent by the system. we want to trap sigint, sigkill and sigterm and do the same as above. ''' self.we_die() def we_die(self): global jack_alive global jack_client global lock_file if jack_alive: jack_client.close() if os.path.isfile(lock_file): new_pid = str(os.getpid()) if os.path.isfile(lock_file): with open(lock_file, "r") as lk_file: for line in lk_file: # only need one line old_pid = line.rstrip() if new_pid == old_pid: os.remove(lock_file) Gtk.main_quit() us = StudioControls() us.window_main.show_all() Gtk.main()