Динамически создавать сигнал DBus в python

Я прочитал некоторые темы, связанные с динамическим созданием методов Python, и следовал их инструкциям, но это не сработало. Я не знаю, потому ли это, что я использую декоратор @ или что-то еще.

Код здесь, очень простой.

При запуске этого кода ошибок не возникало, но когда я использую D-feet (инструмент для проверки информации dbus), я не смог найти новые созданные мной сигналы.

#!/usr/bin/python

import dbus
import dbus.service
import dbus.glib
import gobject
from dbus.mainloop.glib import DBusGMainLoop

import psutil

class EventServer(dbus.service.Object):
    i = 0

    @dbus.service.signal('com.github.bxshi.event')
    def singal_example(self,msg):
        """ example of singals
        """
        print msg

    def __init__(self):
        bus_name = dbus.service.BusName('com.github.bxshi.event', bus=dbus.SessionBus())
        dbus.service.Object.__init__(self, bus_name, '/com/github/bxshi/event')

    def create(self):
        self.i +=1
        setattr(self.__class__, 'signal_'+str(self.i), self.singal_example)


if __name__ == "__main__":
    DBusGMainLoop(set_as_default=True)
    bus = dbus.SessionBus()
    eventserver = EventServer()
    gobject.timeout_add(1000,eventserver.create)
    loop = gobject.MainLoop()
    loop.run() 

person bxshi    schedule 07.06.2012    source источник


Ответы (1)


  1. у вас опечатка: singal_example вместо signal_example
  2. в вашем create-методе вы вызываете setattr в классе. Я не знаю, что вы пытаетесь сделать, но вы должны просто подать сигнал

Это фиксированный пример:

#!/usr/bin/python

import dbus
import dbus.service
import dbus.glib
import gobject
from dbus.mainloop.glib import DBusGMainLoop

#import psutil

class EventServer(dbus.service.Object):
    i = 0

    @dbus.service.signal('com.github.bxshi.event')
    def signal_example(self,msg):
        """ example of singals
        """
        print msg

    def __init__(self):
        bus_name = dbus.service.BusName('com.github.bxshi.event', bus=dbus.SessionBus())
        dbus.service.Object.__init__(self, bus_name, '/com/github/bxshi/event')

    def create(self):
        self.i +=1
        #setattr(self.__class__, 'signal_'+str(self.i), self.singal_example)
        self.signal_example('msg: %d' % self.i)


if __name__ == "__main__":
    DBusGMainLoop(set_as_default=True)
    bus = dbus.SessionBus()
    eventserver = EventServer()
    gobject.timeout_add(1000,eventserver.create)
    loop = gobject.MainLoop()
    loop.run()

После этого можно подключиться к сигналу:

# ...
bus = dbus.Bus()
service=bus.get_object('com.github.bxshi.event', '/com/github/bxshi/event')
service.connect_to_signal("signal_example", listener)
# ...
person mata    schedule 07.06.2012
comment
Я не хочу испускать сигналы. Я хочу создать новый сигнал во время выполнения. Я имею в виду, что сначала у меня может быть только 1 сигнал, а после того, как придет запрос от другого процесса, я создам для него новый сигнал. - person bxshi; 07.06.2012