Как организовать проект на основе парсинга скриптов и вставки данных в postgres?

У меня есть несколько вопросов о моей логике в проекте. Мой проект основан на скриптах, которые анализируют данные с тысячи сайтов (отдельный скрипт для сайта). На самом деле у меня есть около 40 скриптов, но я хочу организовать их для больших данных в будущем. После очистки скрипты имеют метод «insert_data», который вставляет данные (список словарей) в мою базу данных POSTGRESQL. В скриптах для получения данных использую библиотеки: запросы, urllib, иногда selenium (страницы с JS) и BeautifulSoup4 для парсинга загруженных данных. Ниже логика моих скриптов:

class Main():
# initialize variables

def __init__():

def get_url(self, url):
    requests.get(url)
    return soup(url)

# data_insert is list of dicts
def insert_data(self, data_insert):
    cursor.executemany("INSERT INTO table1 (url, title, etc...) VALUES (% (url)s, %(title)s, data_insert)

def __main__(self):
    # 1 looping
    for auction in auctions:
        list_auctions.append(get_href_auction)

    # 2 looping
    for row in list_auctions:
        get_url(row)
        grab some data
        record = {"var1":"value", "var2":"value" etc}
        final_list.append(record)

    # after 2 looping insert data to POSTGRESQL
    insert_data(final_list)

В базе данных у меня есть таблица для каждого веб-сайта и таблица «data_all». Скрипты вставляют данные в таблицу для каждого веб-сайта, и у меня есть триггер после вставки, который загружает данные из этих таблиц в основную таблицу «data_all». Ниже код моего триггера:

CREATE TRIGGER trigger_insert_data_to_main_table
  AFTER INSERT
  ON data_table1
  EXECUTE PROCEDURE insert_data_to_main_table();

CREATE TRIGGER trigger_insert_data_to_main_table
  AFTER INSERT
  ON data_table2
  EXECUTE PROCEDURE insert_data_to_main_table();

и т.д... для всех моих таблиц.

CREATE OR REPLACE FUNCTION insert_data_to_main_table()
  RETURNS trigger AS
$BODY$
BEGIN
 INSERT INTO data_all
 SELECT t1.*
 FROM data_table1 t1
 LEFT JOIN data_all d ON d.url = t1.url
 WHERE d.url IS NULL
 UNION
 SELECT t2.*
 FROM data_table2 t2
 LEFT JOIN data_all d ON d.url = t2.url
 WHERE d.url IS NULL;
 RETURN NULL;
 END;
 SELECT t3.*
 FROM data_table3 t3 

и т.д... для всех моих таблиц.

Эта функция позволяет мне игнорировать дубликаты URL-адресов (которые УНИКАЛЬНЫ для каждой строки) в основной таблице «data_all».

  1. Хорошая логика для парсинга скриптов? Для страниц без JS работает нормально (быстро). Иногда у меня есть только 1 цикл, чтобы получить некоторые данные с главной страницы (без итерации для аукциона).
  2. Это хороший способ вставлять данные таким образом? В будущем мне нужно создать основной скрипт с очередью скриптов очистки
  3. Как обезопасить основной скрипт, чтобы в случае ошибки он возвращал мне сообщение и продолжал работать?
  4. Я недавно читал о многопроцессорности. Стоит ли использовать его в этом проекте для повышения эффективности?
  5. У кого-нибудь есть лучшая идея для захвата данных со страниц JS? Я нахожу решение на основе request.post. Это единственная альтернатива селену в питоне?

Большое спасибо, что прочитали пост до конца и надеюсь, что вы можете мне помочь. С Уважением :)


person Bob    schedule 10.04.2019    source источник


Ответы (1)


вы можете ускорить страницы с селеном, стратегически остановив загрузку страницы, как только вы получите необходимые данные или появится определенный элемент. что касается рекомендаций db, я использовал simpledb, но в последний раз у него были странные проблемы с многопоточностью, поэтому я написал свою собственную локальную простую БД на основе словарей и json, я использую queue.queue() q.get() как взломать, чтобы заблокировать, а затем освободить разделы моего кода базы данных, чтобы я мог использовать их в нескольких потоках

я советую вам поместить правильные функции ожидания () между запросами к одному и тому же домену из уважения ко всем другим агрегаторам данных, если вы злоупотребите веб-сайтом, они усложнят работу для всех остальных. выработайте хорошую логику, основанную на IP-адресах, файлах cookie и URL-адресах/доменных именах, которые пропускают или ждут. я написал свой собственный IP-менеджер, чтобы справиться с этим, у меня даже есть минимальное время ожидания для URL-адресов, хотя я думаю, что большинство веб-сайтов проверяют домен, а не URL-адрес, в основном я проверяю IP-адрес, домен и конкретный URL-адрес в базе данных, чтобы увидеть, будет ли ждать X сумма секунд на основе минимальных значений по умолчанию. все http-запросы проходят через мой ipmanager перед отправкой запросов

если вы используете многопоточность или многопроцессорность, вы хотите разделить функции, выполнение которых занимает больше всего времени, что обычно является HTTP-запросами. Время ожидания ввода db минимально, я не разделяю это на разные потоки, я просто разрешаю только определенные операции происходят по одному потоку за раз, например, функции «обновления» в базе данных. функции чтения не имеют блокировки

вот класс под названием QStack, в основном я добавляю запросы в стек, который возвращает идентификатор, я использую этот идентификатор в своем другом коде для обработки ответа, я могу дать больше информации о коде, если вы хотите объединиться, я пытался понять Почему в последнее время у меня возникают проблемы с подключением к прокси-серверу, я попытался изменить значение проверки на false и прочитал некоторые странные вещи в urllib3 в запросах.

class QStack:

def getproxy(self,fp=0):
    #print 'wait for queue'
    #z=self.qq.get()
    while True:
        if fp:
            p=self.fpxo.get()
        else:
            p=self.pxo.get()
        ipo=ip(p)

        #print 'looking for ip'
        if ipo not in self.inuse:
            self.inuse.append(ipo)
            break
        time.sleep(1)

    #print 'put bob in queue'
    #self.qq.put('bob')
    return p


def start(self):
    self.ts1=time.time()
    self.qc=0
    self.hari2=1
    t = threading.Thread(
    target=self.manage_threads
            )
    t.start()




def stop(self):
    self.hari2=0

def __init__(self,ipm,pxo,
      mthreads=1,fpxo=None):

    self.fpxo=fpxo
    self.hari=1
    self.mthreads=mthreads
    self.ipm=ipm
    self.pxo=pxo

    self.rqs = Queue.Queue()
    self.rps = Queue.Queue()

    self.b = Queue.Queue()
    self.c = Queue.Queue()
    self.c.put('bob')
    self.b.put('bob')
    self.inuse=[]
    self.mc=-1
    self.athreads=0

    self.idc=-1
    self.ct=0

def manage_threads(self):
    while self.hari2:
        if self.athreads<self.mthreads:
            t = threading.Thread(
                target=self.do
            )
            t.start()
            #print 'added thread'*20
        #print 'current threads'*20,self.athreads
        time.sleep(1)

def do(self):
    if self.rqs.empty():
        return -1

    self.athreads+=1
    q=self.rqs.get()
    s = http_incognito(self.ipm)
    s.timeout=self.pxo.timeout
    hari = True

    while hari:
        #print 'hi'
        if q.ua == 'auto':
            s.incognito()
        else:
            s.useragent=q.ua

        s.cookies=q.cookies
        s.referer = q.ref

        fp=0
        self.mc+=1
        if self.mc >= self.mthreads:
            self.mc=0
            fp=1

        p = self.getproxy(fp)
        ipo=ip(p)

        q.p=p
        q.fp=fp
        #print'lalaalla'

        try:


            then=time.time()
            a=s.get(q.url,proxy=p)
            hari=0
            #print 'done'
            #ff=random.choice(xrange(1,10))
            #print 'sleep',ff
            #time.sleep(ff)


            #print'did reqeust'
        except Exception as e:
            print 'HTTP ERROR'
            print str(e)
            self.inuse.remove(ipo)

            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)
            self.athreads-=1
            continue


        q.rp=a
        #print 'okayyyy'

        #no blank response
        if not q.rp:
            print 'blank response'*20
            print q.url,q.p


            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)

            hari=1
            self.inuse.remove(ipo)
            continue

        if not q.rcheck(q.v()):
            print 'NONONONONONO'

            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)


            print 'robot', p
            self.inuse.remove(ipo)
            hari=1
            self.athreads-=1
            continue
        #print 'horehorehore'
        #print 'passed rcheck'

    #print 'lalala'
    self.rps.put(q)
    del s  #remove
    z= q.id


    self.inuse.remove(ipo)
    self.athreads-=1
    return z

def readd(self,q):
    self.rqs.put(
       q)

def add(self,
 url,
 ref=None,
 cookies=None,
 ua='auto',
 headers=None,
 rcheck=None):





    a=self.b.get()


    self.ct +=1

    q = QQuery(url,
        ref,cookies,
         ua,
         headers,
          rcheck=rcheck)

    self.idc += 1
    q.id=self.idc
    self.rqs.put(
       q)


    self.b.put('fred')
    return q.id


def get(self,ide=None):
    a=self.c.get()

    q = self.rps.get()
    if ide != None:
        while q.id != ide:
            self.rps.put(q)
            q=self.rps.get()

    self.c.put('bob')
    return q

используется так:

qinfo={}
pxo=proxymanager()
ipm=ipmanager()
fpxo=myip()

qs=QStack(10,pxo,ipm,fpxo)
for u in urllist:
    ide = qs.add(u)
    qinfo[ide]='type, more info'
qs.start()


while True:
    q=qs.get() 
    info=qinfo[q.id]
    #process query in different ways
    #based on info, id, etc
    find more urls
     qs.add(u)
person Edo Edo    schedule 13.04.2019