Функция многопроцессорного пула apply_async при применении к объекту выдает ошибку «Невозможно выбрать объекты ‹type 'thread.lock»›

Моя функция __init__ пытается вызвать функцию класса Scheduler с именем verify_func().

def__init__:
def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
    return func.__get__(obj, cls)


if __name__=='__main__':
  while True:
    #Create a scheduler object
    scheduler_obj=Scheduler()
    try:
      #Connect to the database get all the new requests to be verified
      db = Database(scheduler_obj.username_testschema, scheduler_obj.password_testschema, scheduler_obj.mother_host_testschema,
                    scheduler_obj.mother_port_testschema, scheduler_obj.mother_sid_testschema, 0)
      result = db.query("SELECT JOB_ID FROM %s.table_1 WHERE JOB_STATUS='%s'" % (scheduler_obj.username_testschema, 'initiated'))
      copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
      pool = mp.Pool(len(result))
      for row in result:
        verify_id = row[0]
        print mp.current_process()
        try:
          pool.apply_async(scheduler_obj.verify_func, (verify_id))
        except Exception as e:
                    scheduler_obj.logger.exception("MP exception : %s", e)

      pool.close()
      pool.join()

    except Exception as e:
      scheduler_obj.logger.exception(e)
    finally:
      time.sleep(10)

Этот verify_func пытается выполнить несколько запросов к базе данных, используя cx_oracle.

def verify_func(self,verify_id):
    print "Verifying with Current Id : ", verify_id
    try:
      db = Database(self.username_testschema, self.password_testschema, self.mother_host_testschema,
                    self.mother_port_testschema, self.mother_sid_testschema, 0)
      result = db.query("select * from %s.table_2 where job_id=%d" % (self.username_testschema,verify_id))
      column_name = [d[0] for d in db.cursor.description]
      final_dictionary = [dict(zip(column_name, row)) for row in result]

      #On getting the new job_ids whose status is .. try to run checks for each one.
      for row in final_dictionary:
        print row

Так что это подключение и запуск запроса к базе данных, который выдает эту ошибку, поскольку не говорит, где именно он ломается.

class Database(object):
 def __init__:
    self.connection = cx_Oracle.connect(self.connect_string, mode=self.mode) 

Дайте мне знать, если потребуется больше объяснений.


person TommyT    schedule 02.04.2015    source источник
comment
Внутри экземпляра Scheduler есть объект, который содержит threading.Lock, который нельзя замариновать. Вероятно, вам потребуется добавить методы __getstate__/__setstate__ в класс Scheduler, чтобы удалить объект-нарушитель из экземпляра перед его травлением.   -  person dano    schedule 02.04.2015
comment
Хорошо, но какой объект, нет возможности отследить его? Позвольте мне попробовать это. Спасибо @дано.   -  person TommyT    schedule 02.04.2015
comment
Обычно я копаюсь в исходном коде, чтобы понять это. Вероятно, вы можете добавить некоторую трассировку в модуль pickle, которая поможет определить, какой именно объект, но я не могу сказать вам, какую функцию добавить навскидку.   -  person dano    schedule 02.04.2015
comment
Да, я сделал scheduler_object.__dict__ и там я увидел, что у меня есть переменная экземпляра регистратора, которую нельзя выбрать. Спасибо Дано.   -  person TommyT    schedule 02.04.2015


Ответы (1)


Да, я сделал scheduler_object.dict и там я увидел, что у меня есть переменная экземпляра регистратора, которую нельзя выбрать. Спасибо @Дано.

Таким образом, лучший способ отладки - выяснить, что может быть переменной/методом класса, если вы получаете эту ошибку. Кроме того, вы можете переопределить методы класса регистратора getstate , setstate, чтобы сделать его доступным для обработки, как показано здесь. Не удается замариновать регистраторы?

person TommyT    schedule 02.04.2015
comment
Попробуйте добавить import dill в начало кода. dill знает, как травить методы экземпляра, блокировки и прочие вещи, не занимаясь тем _unpickle_method делом, которым вы занимаетесь. Если это не сработает, т. к. вы используете multiprocessing, вместо этого вы можете использовать pathos.multiprocessing, который является ответвлением multiprocessing, заменяющим использование pickle на использование dill. - person Mike McKerns; 03.04.2015
comment
Я попробую это, добрый сэр. - person TommyT; 03.04.2015