Rev 39 | Blame | Compare with Previous | Last modification | View Log | RSS feed
import threading
import urllib.parse
import httplib2
import urllib.robotparser
from . import dbcontrol
import time
import hashlib
import os,tempfile
from lxml import etree
import re,random
import os
import socket
import tldextract
#scheme='https', netloc='www.surfpoeten.de:443', path='/about/tube/', params='', query='tube=3&hallo=7', fragment='id#asd
import traceback
socket.setdefaulttimeout(120)
robots_cache = {}
class Crawler(threading.Thread):
allowed_sites = [r'''.*''']
forbidden_sites = []
user_agent = 'Mozilla Firefox'
db_pool = None
db_table = 'crawl'
db_sequence = 'crawl_position'
db_domains = 'crawl_domains'
crawl_maxdepth = 1000
"""
Initialize object
"""
def __init__(self,db_pool,db_table='crawl'):
self.db_pool=db_pool
self.db_table=db_table
super(Crawler,self).__init__()
class EchoTarget:
def __init__(self,url):
self.links=[]
self.purl = urllib.parse.urlparse(url)
def start(self, tag, attrib):
if tag!='a':
return
try:
link = attrib['href']
except:
return
lp=urllib.parse.urlparse(link)
if lp.scheme=='':
scheme=self.purl.scheme
else:
scheme=lp.scheme
if lp.netloc=='':
netloc=self.purl.netloc
else:
netloc=lp.netloc
if lp.netloc=='' or lp.scheme=='':
k = urllib.parse.ParseResult(
scheme,netloc,
lp.path,
lp.params,
lp.query,
lp.fragment
)
link = urllib.parse.urlunparse(k)
if scheme!='http' and scheme != 'https':
return
a = {}
a['url']=link
if netloc!=self.purl.netloc:
a['x']=True
else:
a['x']=False
self.links.append(a)
def siteallowed(self,url):
return True
# print "CHECK:",url
for site in allowed_sites:
res = re.search ("^%s" % (site,), url )
if res != None:
# print "YES"
return True
# print "NO"
return False
pass
def close(self):
return "closed"
fetch_dir='data'
fetch_prefix='epub_'
forbidden_sites= [
r'''^.*\.youtube\.*''',
r'''^.*\.amazon\.*''',
r'''^.*\.ebay\.*''',
r'''^.*\.facebook.\*''',
# r'''^.*\.google\.*''',
r'''^.*\.wikipedia\.*'''
r'''^.*\.libri\.de*'''
]
def find_match(self,sitelist,url):
for s in sitelist:
if re.search(s,url) != None:
return True
return False
def is_site_allowed(self,url):
if self.allowed_sites != False:
rc = self.find_match(self.allowed_sites,url)
if rc == False:
return False
for s in self.forbidden_sites:
rc = self.find_match(self.forbidden_sites,url)
if rc == True:
return False
return True
""" domain CHARACTER VARYING NOT NULL,
"""
"""
Create the crawler table
"""
def db_create_tables(self):
sql = """
CREATE TABLE %(table)s
(
url CHARACTER VARYING NOT NULL,
visits INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0,
position INTEGER DEFAULT 0,
content CHARACTER varying DEFAULT NULL,
ref CHARACTER VARYING DEFAULT NULL,
depth INTEGER DEFAULT 0,
sched TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW()
);
ALTER TABLE %(table)s ADD CONSTRAINT %(table)s_pk PRIMARY KEY (url);
CREATE INDEX %(table)s_idx1 ON %(table)s USING BTREE
(priority DESC ,visits ASC, depth ASC, position ASC);
""" % {'table':self.db_table}
cursor = self.db_pool.get_cursor()
cursor.execute(sql)
sql = """CREATE SEQUENCE %(sequence)s START 1""" % {
'sequence':self.db_sequence
};
cursor.execute(sql);
sql = """CREATE TABLE %(table)s (
domain CHARACTER VARYING NOT NULL
);
ALTER TABLE %(table)s ADD CONSTRAINT %(table)s_pk PRIMARY KEY (domain);
""" % {'table':self.db_domains }
cursor.execute(sql);
self.db_pool.free_cursor(cursor)
def db_repair_tables(self):
"""
Repair the database after unwanted crawler terminateion
"""
sql = "UPDATE "+self.db_table+" SET sched = NOW() WHERE sched IS NULL"
cursor = self.db_pool.get_cursor()
cursor.execute(sql)
self.db_pool.free_cursor(cursor)
def db_drop_tables(self):#
"""
Drop the crawler table
"""
sql = "DROP TABLE IF EXISTS "+self.db_table
sql += "; DROP SEQUENCE IF EXISTS "+self.db_sequence
sql += "; DROP TABLE IF EXISTS "+self.db_domains
cursor = self.db_pool.get_cursor()
cursor.execute(sql)
self.db_pool.free_cursor(cursor)
def check_robots(self,d):
"""
Check if an url is excluded by robots.txt
Input: d = url
Returns True if the page can be fetched
Otherwise this function returns False
"""
try:
rp = robots_cache[p.netloc]
except:
try:
rp = urllib.robotparser.RobotFileParser()
rpurl = urllib.parse.ParseResult(p.scheme,p.netloc,'/robots.txt','','','')
rurl = urllib.parse.urlunparse(rpurl)
rp.set_url(rurl)
rp.read()
robots_cache[p.netloc]=rp
except:
return False
try:
if not rp.can_fetch('*',url.encode()):
#self.log(url,"Excluded by robots.txt")
return False
except:
return False
return True
"""
Add an url to the data base
"""
def db_addurl(self,url,
depth=0,
tmin=0,
tmax=0,
priority=0,
ref=None):
if self.is_site_allowed(url)==False:
print ("ADDURL NOT ALLOWED: ",url)
return
if depth>self.crawl_maxdepth:
print ("ADDURL MAXDEPTH EXCEEDED: ",url,"DEPTH:",depth,"MAXDEPTH",self.crawl_maxdepth)
return False
u = urllib.parse.urlparse(url)
if self.check_robots(u):
print ("ADDURL EXCLUDED BY ROBOTS.TXT: ",url)
return
if tmin<tmax:
tx = random.randint(tmin,tmax)
else:
tx = tmin
# get cursor
cursor = self.db_pool.get_cursor()
try:
print ("ADDURL: ",url,"DEPTH:",depth,"PRIO:",priority)
# update depth if possible
sql = "UPDATE "+self.db_table+ """ SET depth=%s WHERE url=%s AND depth>%s"""
cursor.execute (sql,(depth,url,depth))
sql = "INSERT INTO " + self.db_table + """
(url,ref,depth,priority,position)
VALUES(%(url)s,%(ref)s,%(depth)s,%(priority)s,nextval(%(sequence)s) )
ON CONFLICT DO NOTHING
"""
# eurl = str(kurl).encode("utf-8")
# eurl = kurl
cursor.execute(sql,{
'sequence':self.db_sequence,
'table':self.db_table,
'url':url,
'ref':ref,
'depth':depth,
'priority':priority
});
# (u.scheme,u.netloc,u.path,u.query,u.fragment,ref,depth,priority))
return False
except Exception as e:
print ("FAILED SQL?",cursor.query)
traceback.print_exc()
# raise
# if type(e) != dbcontrol.pgdb.IntegrityError:
# raise
return False
finally:
self.db_pool.free_cursor(cursor)
def fetchbody_to_tempfile(self,resp,maxlen=0):
"""
Read the response body to a temporary file.
The temporary file will be created.
Returns the name of the created file.
"""
(handle,name) = tempfile.mkstemp(
dir=self.fetch_dir,prefix=self.fetch_prefix
)
curlen=0
data = resp.read(100000)
while len (data) > 0 :
os.write(handle,data)
data=resp.read(100000)
os.close(handle)
return name
def create_temp_file(self):
(handle,name) = tempfile.mkstemp(
dir=self.fetch_dir,prefix=self.fetch_prefix
)
os.close(handle)
handle = open(name,'a+')
return (handle,name)
def fetchbody_to_file(self,resp,handle,maxlen=0):
blk_size=8000
md5 = hashlib.md5()
data = resp.read(blk_size)
while len(data)>0:
if handle.tell()+len(data) > maxlen and maxlen != 0:
diff = maxlen-handle.tell()
handle.write(data[0:diff])
md5.update(data[0:diff])
return md5.hexdigest()
md5.update(data)
handle.write(data)
data = resp.read(blk_size)
return md5.hexdigest()
def fetchbody(self,resp,maxlen=0):
body = resp.read(maxlen)
return body
def handle_response(self,row,resp,body):
mime = resp['content-type'].split(';')[0]
if mime=='text/html' or mime=='text/xml':
self.handle_response_html(row,resp,body)
"""
Add an url list
"""
def db_addurllist(self,l,ref,depth):
for i in l:
if not i['x']:
# link goes to an internal page
d = depth+1
else:
# link leads to an external page
d = 0
domain = self.extract_domain(i['url'])
rc = self.lookup_domain(domain)
if rc:
p=0
else:
print("NEW DOMAIN FOUND:",domain)
p=1
self.db_addurl(i['url'],depth=d,ref=ref,priority=p)
def lookup_domain(self,domain):
try:
cursor = self.db_pool.get_cursor()
sql = 'SELECT * FROM ' + self.db_domains + """ WHERE domain=%(domain)s """
cursor.execute ( sql,{'domain':domain} )
result = cursor.fetchone()
if result != None:
return True
sql = "INSERT INTO " + self.db_domains + """
(domain) VALUES(%(domain)s)
ON CONFLICT DO NOTHING
"""
cursor.execute(sql,{'domain':domain})
return False
except:
print ("FAILED SQL?",cursor.query)
traceback.print_exc()
return False
finally:
self.db_pool.free_cursor(cursor)
def handle_response_html(self,row,resp,body):
# b = self.fetchbody(resp,10000000)
# print ("html responser Yea!!!")
# os._exit(1)
l = self.findlinks(body,row['url'])
# print ("Links:",l)
# os._exit(0)
h=resp.getheaders()
# print("Headers are",h)
al=''
for i in h:
print("III",i)
if i[0].lower()=='location':
print("Location!x!x!")
print("Hitler")
al = i[1]
print("BREAKING")
break
print("WE ARE DONE")
ai={}
ai['url']=al
ai['x']=False
print("AIAIAI",ai)
l.append(ai)
print("LIST",l)
self.db_addurllist(l,row['depth'],row['maxdepth'],self.crawl_maxdepth,row['url'])
return
def mime_dispatcher(self,mime):
try:
func = self.mimejump[mime]
print("MJ: ",func)
except:
print("MJE ")
func = mime_default
print("MJE AND ",func)
func(self)
def reschedule_url(self,row):
if row==None:
return
print("RESCHEDULE:",row['url'])
cursor = self.db_pool.get_cursor()
try:
sql = "UPDATE " + self.db_table + """
SET position=nextval(%(sequence)s), priority=0, content=%(content)s, visits=%(visits)s WHERE url=%(url)s"""
cursor.execute(sql,{
'sequence':self.db_sequence,
'visits':row['visits'],
'url':row['url'],
'content':row['content'].decode('utf8')
})
except:
print ("FAILED SQL?",cursor.query)
traceback.print_exc()
return False
finally:
self.db_pool.free_cursor(cursor)
def fetchbody_to_tempfile(self,resp,maxlen=0):
"""
Read the response body to a temporary file.
The temporary file will be created.
Returns the name of the created file.
"""
(handle,name) = tempfile.mkstemp(dir=self.fetch_dir,prefix=self.fetch_prefix)
curlen=0
data = resp.read(100000)
while len (data) > 0 :
os.write(handle,data)
data=resp.read(100000)
os.close(handle)
return name
def create_temp_file(self):
(handle,name) = tempfile.mkstemp(dir=self.fetch_dir,prefix=self.fetch_prefix)
os.close(handle)
handle = open(name,'a+')
return (handle,name)
def fetchbody_to_file(self,resp,handle,maxlen=0):
blk_size=8000
md5 = hashlib.md5()
data = resp.read(blk_size)
while len(data)>0:
if handle.tell()+len(data) > maxlen and maxlen != 0:
diff = maxlen-handle.tell()
handle.write(data[0:diff])
md5.update(data[0:diff])
return md5.hexdigest()
md5.update(data)
handle.write(data)
data = resp.read(blk_size)
return md5.hexdigest()
def fetchbody(self,resp,maxlen=0):
body = resp.read(maxlen)
return body
def handle_response(self,row,resp,body):
mime = resp['content-type'].split(';')[0]
if mime=='text/html' or mime=='text/xml':
self.handle_response_html(row,resp,body)
def handle_response_html(self,row,resp,body):
# b = self.fetchbody(resp,10000000)
# print ("html responser Yea!!!")
# os._exit(1)
l = self.findlinks(body,row['url'])
# print ("Links:",l)
# os._exit(0)
h=resp.getheaders()
# print("Headers are",h)
al=''
for i in h:
print("III",i)
if i[0].lower()=='location':
print("Location!x!x!")
print("Hitler")
al = i[1]
print("BREAKING")
break
print("WE ARE DONE")
ai={}
ai['url']=al
ai['x']=False
print("AIAIAI",ai)
l.append(ai)
print("LIST",l)
self.db_addurllist(l,row['depth'],row['maxdepth'],self.crawl_maxdepth,row['url'])
return
def mime_dispatcher(self,mime):
try:
func = self.mimejump[mime]
print("MJ: ",func)
except:
print("MJE ")
func = mime_default
print("MJE AND ",func)
func(self)
def fetchbody_to_tempfile(self,resp,maxlen=0):
"""
Read the response body to a temporary file.
The temporary file will be created.
Returns the name of the created file.
"""
(handle,name) = tempfile.mkstemp(dir=self.fetch_dir,prefix=self.fetch_prefix)
curlen=0
data = resp.read(100000)
while len (data) > 0 :
os.write(handle,data)
data=resp.read(100000)
os.close(handle)
return name
def create_temp_file(self):
(handle,name) = tempfile.mkstemp(dir=self.fetch_dir,prefix=self.fetch_prefix)
os.close(handle)
handle = open(name,'a+')
return (handle,name)
def fetchbody_to_file(self,resp,handle,maxlen=0):
blk_size=8000
md5 = hashlib.md5()
data = resp.read(blk_size)
while len(data)>0:
if handle.tell()+len(data) > maxlen and maxlen != 0:
diff = maxlen-handle.tell()
handle.write(data[0:diff])
md5.update(data[0:diff])
return md5.hexdigest()
md5.update(data)
handle.write(data)
data = resp.read(blk_size)
return md5.hexdigest()
def fetchbody(self,resp,maxlen=0):
body = resp.read(maxlen)
return body
def handle_response(self,row,resp,body):
mime = resp['content-type'].split(';')[0]
if mime=='text/html' or mime=='text/xml':
self.handle_response_html(row,resp,body)
def handle_response_html(self,row,resp,body):
# b = self.fetchbody(resp,10000000)
# print ("html responser Yea!!!")
# os._exit(1)
l = self.findlinks(body,row['url'])
# print ("Links:",l)
# os._exit(0)
h=resp.getheaders()
# print("Headers are",h)
al=''
for i in h:
print("III",i)
if i[0].lower()=='location':
print("Location!x!x!")
print("Hitler")
al = i[1]
print("BREAKING")
break
print("WE ARE DONE")
ai={}
ai['url']=al
ai['x']=False
print("AIAIAI",ai)
l.append(ai)
print("LIST",l)
self.db_addurllist(l,row['depth'],row['maxdepth'],self.crawl_maxdepth,row['url'])
return
def mime_dispatcher(self,mime):
try:
func = self.mimejump[mime]
print("MJ: ",func)
except:
print("MJE ")
func = mime_default
print("MJE AND ",func)
func(self)
def fetchbody_to_tempfile(self,resp,maxlen=0):
"""
Read the response body to a temporary file.
The temporary file will be created.
Returns the name of the created file.
"""
(handle,name) = tempfile.mkstemp(dir=self.fetch_dir,prefix=self.fetch_prefix)
curlen=0
data = resp.read(100000)
while len (data) > 0 :
os.write(handle,data)
data=resp.read(100000)
os.close(handle)
return name
def create_temp_file(self):
(handle,name) = tempfile.mkstemp(dir=self.fetch_dir,prefix=self.fetch_prefix)
os.close(handle)
handle = open(name,'a+')
return (handle,name)
def fetchbody_to_file(self,resp,handle,maxlen=0):
blk_size=8000
md5 = hashlib.md5()
data = resp.read(blk_size)
while len(data)>0:
if handle.tell()+len(data) > maxlen and maxlen != 0:
diff = maxlen-handle.tell()
handle.write(data[0:diff])
md5.update(data[0:diff])
return md5.hexdigest()
md5.update(data)
handle.write(data)
data = resp.read(blk_size)
return md5.hexdigest()
def fetchbody(self,resp,maxlen=0):
body = resp.read(maxlen)
return body
"""
Fetch one url
"""
def processrow(self,r):
try:
p = urllib.parse.urlparse(r['url'])
conn = httplib2.Http()
# url = urllib.parse.urlunparse(p)
# conn.follow_redirects = False
print ("FETCH:",r['url'])
resp,body = conn.request(
r['url'],
method="GET",
headers={'user-agent': self.user_agent}
)
self.handle_response(r,resp,body)
r['visits']=r['visits']+1
r['content']=body
except:
traceback.print_exc()
return False
def handle_response(self,row,resp,body):
mime = resp['content-type'].split(';')[0]
if mime=='text/html' or mime=='text/xml':
self.handle_response_html(row,resp,body)
def handle_response_html(self,row,resp,body):
# b = self.fetchbody(resp,10000000)
# print ("html responser Yea!!!")
# os._exit(1)
l = self.findlinks(body,row['url'])
# print ("Links:",l)
# os._exit(0)
h=resp.getheaders()
# print("Headers are",h)
al=''
for i in h:
print("III",i)
if i[0].lower()=='location':
print("Location!x!x!")
print("Hitler")
al = i[1]
print("BREAKING")
break
print("WE ARE DONE")
ai={}
ai['url']=al
ai['x']=False
print("AIAIAI",ai)
l.append(ai)
print("LIST",l)
self.db_addurllist(l,row['depth'],row['maxdepth'],self.crawl_maxdepth,row['url'])
return
def mime_dispatcher(self,mime):
try:
func = self.mimejump[mime]
print("MJ: ",func)
except:
print("MJE ")
func = mime_default
print("MJE AND ",func)
func(self)
def get_next_url(self,):
try:
db = self.db_pool.get_conn()
cursor=db.cursor()
sql = """SELECT * FROM %s WHERE
position > 0 ORDER BY
priority DESC, visits ASC, depth ASC, position ASC LIMIT 1 FOR UPDATE """ % (self.db_table,)
cursor.execute(sql)
r = cursor.fetchone()
if r != None:
r = dbcontrol.row_to_dict(cursor,r)
sql = "UPDATE " +self.db_table+" SET position=0 WHERE url=%(url)s "
cursor.execute(sql,r )
db.commit()
return r
except Exception as e:
print ("FAILED SQL?",cursor.query)
traceback.print_exc()
db.rollback()
finally:
cursor.close()
self.db_pool.free_conn(db)
def test(self):
while True:
row = self.get_next_url()
if row == None:
return False
try:
fn = self.processrow(row)
except:
raise
if fn == False:
self.reschedule_url(row)
else:
self.reschedule_url(row)
# os._exit(0)
return True
def run(self):
print("Starting ", self.tname)
lastrc=False
while True:
rc = self.test()
if rc == False:
if lastrc==True:
pass
print("noting to do ")
# self.log("Nothing to do - sleeping.")
time.sleep(1)
lastrc=rc
time.sleep(0.01)
# return if the main thread has finished
en = threading.enumerate()
if not en[0].is_alive():
return
"""
Update the next url
"""
def updatenexturl(self):
try:
db = self.db_pool.get_conn()
except:
self.log("Error getting connection pool");
return
try:
cursor=db.cursor()
while True:
try:
sql = """SELECT url,depth FROM %s
WHERE sched<NOW() ORDER BY
sched ASC LIMIT 1 FOR UPDATE NOWAIT"""
cursor.execute(sql,(self.db_table,))
print("EXECUTED")
break
except:
print("EXception !")
db.rollback()
time.sleep(0.5)
print("AFTER")
row =cursor.fetchone()
if row == None:
time.sleep(0.5)
return False
(url,depth)=row
print("R: ",url,depth)
print("ASDASD\n")
return
# print "RESEOARVER",url
sql = "UPDATE plag_crawl SET sched=NOW()+interval '1 days' WHERE url=%s"
cursor.execute(sql,(url,))
db.commit()
# self.log(url,"TEST")
# return True
# print self.name,url
# f=open('tmp/ahne882',"r")
# fbody = f.read()
# f.close()
# fmime="text/html; charset=UTF-8"
fu = self.fetchurl(url)
if not fu:
fbody=False
fmime=None
else:
(fmime,fbody)=fu
if fbody == False:
fbody = None
if fbody == body:
sql = "UPDATE plag_crawl SET sched=NOW()+interval '21 days' WHERE url=%s"
cursor.execute(sql,(url,))
self.log(url,'No chane - recrawl in 21 days')
else:
sql = "UPDATE plag_crawl SET sched=NOW()+interval '3 days',body=%s,mime=%s,senx=FALSE WHERE url=%s"
cursor.execute(sql,(fbody,fmime,url))
self.log(url,'changed - recrawl in 3 days')
db.commit()
links = self.findlinks(fbody,url)
for l in links:
url = l['url']
d = depth
if not l['x']:
d = depth+1
else:
d = 0
self.addurl(url,d)
return True
finally:
cursor.close()
self.db_pool.free_conn(db)
def findlinks(self,resp,body):
if body == None or body=='':
return []
try:
url = resp['content-location']
except:
return []
try:
et = Crawler.EchoTarget(url)
except:
traceback.print_exc()
return []
parser = etree.HTMLParser(target=et)
parser.feed(body)
root = parser.close()
return et.links
def extract_domain(self,url):
p = urllib.parse.urlparse(url)
# return tldextract.extract(p.netloc).registered_domain
return tldextract.TLDExtract(cache_file='/tmp/tld_cache').__call__(p.netloc).registered_domain
def log(self,url,txt):
# return
print("LOG",url,":",txt)
def mime_text_html(self):
print("TEXT HTML IN THE FUNC")
def mime_default(self):
print("DEFAULT")
#mimejump = {'text/html':Updater.fetchurl}
mimejump = {'text/html':mime_text_html}