分布式网络爬虫_爬虫的基本原理

分布式网络爬虫_爬虫的基本原理不过我们还需要完成一轮抽象 因为后端引擎会处理的事务比较繁琐 它不仅仅包含了存储 还包含了缓存 对表头的拼接 数据的处理等

24 存储引擎

爬虫项目的一个重要的环节就是把最终的数据持久化存储起来,数据可能会被存储到 MySQL、MongoDB、Kafka、Excel 等多种数据库、中间件或者是文件中。

之前我们爬取的案例比较简单,像是租房网站的信息等。但是实际情况下,我们的爬虫任务通常需要获取结构化的数据。例如一本书的信息就包含书名、价格、出版社、简介、评分等。为了生成结构化的数据,这里豆瓣图书为例书写我们的任务规则。

爬取结构化数据

step1 从首页获取热门标签信息

image

const regexpStr = `<a href="([^"]+)" class="tag">([^<]+)</a>` func ParseTag(ctx *collect.Context) (collect.ParseResult, error) { 
    re := regexp.MustCompile(regexpStr) matches := re.FindAllSubmatch(ctx.Body, -1) result := collect.ParseResult{ 
   } for _, m := range matches { 
    result.Requesrts = append( result.Requesrts, &collect.Request{ 
    Method: "GET", Task: ctx.Req.Task, Url: "<https://book.douban.com>" + string(m[1]), Depth: ctx.Req.Depth + 1, RuleName: "书籍列表", }) } return result, nil } 

step2 获取图书列表

image

 const BooklistRe = `<a.*?href="([^"]+)" title="([^"]+)"` func ParseBookList(ctx *collect.Context) (collect.ParseResult, error) { 
    re := regexp.MustCompile(BooklistRe) matches := re.FindAllSubmatch(ctx.Body, -1) result := collect.ParseResult{ 
   } for _, m := range matches { 
    req := &collect.Request{ 
    Method: "GET", Task: ctx.Req.Task, Url: string(m[1]), Depth: ctx.Req.Depth + 1, RuleName: "书籍简介", } //获取到书名之后,将书名缓存到了临时的 tmp 结构中供下一个阶段读取。 //这是因为我们希望得到的某些信息是在之前的阶段获得的。 req.TmpData = &collect.Temp{ 
   } req.TmpData.Set("book_name", string(m[2])) result.Requesrts = append(result.Requesrts, req) } return result, nil } 
// 缓存结构定义为了一个哈希表,并封装了 Get 与 Set 两个函数来获取和设置请求中的缓存。 type Temp struct { 
    data map[string]interface{ 
   } } // 返回临时缓存数据 func (t *Temp) Get(key string) interface{ 
   } { 
    return t.data[key] } func (t *Temp) Set(key string, value interface{ 
   }) error { 
    if t.data == nil { 
    t.data = make(map[string]interface{ 
   }, 8) } t.data[key] = value return nil } 

step3 获取图书详情

最后,图书的详情页,可以看到图书的作者、出版社、页数、定价、得分、价格、简介等信息。

image

 var autoRe = regexp.MustCompile(`<span class="pl"> 作者</span>:[\d\D]*?<a.*?>([^<]+)</a>`) var public = regexp.MustCompile(`<span class="pl">出版社:</span>([^<]+)<br/>`) var pageRe = regexp.MustCompile(`<span class="pl">页数:</span> ([^<]+)<br/>`) var priceRe = regexp.MustCompile(`<span class="pl">定价:</span>([^<]+)<br/>`) var scoreRe = regexp.MustCompile(`<strong class="ll rating_num " property="v:average">([^<]+)</strong>`) var intoRe = regexp.MustCompile(`<div class="intro">[\d\D]*?<p>([^<]+)</p></div>`) func ParseBookDetail(ctx *collect.Context) (collect.ParseResult, error) { 
    bookName := ctx.Req.TmpData.Get("book_name") page, _ := strconv.Atoi(ExtraString(ctx.Body, pageRe)) book := map[string]interface{ 
   }{ 
    "书名": bookName, "作者": ExtraString(ctx.Body, autoRe), "页数": page, "出版社": ExtraString(ctx.Body, public), "得分": ExtraString(ctx.Body, scoreRe), "价格": ExtraString(ctx.Body, priceRe), "简介": ExtraString(ctx.Body, intoRe), } data := ctx.Output(book) result := collect.ParseResult{ 
    Items: []interface{ 
   }{ 
   data}, } return result, nil } func ExtraString(contents []byte, re *regexp.Regexp) string { 
    match := re.FindSubmatch(contents) if len(match) >= 2 { 
    return string(match[1]) } else { 
    return "" } } 

其中,书名是从缓存中得到的。这里仍然使用了正则表达式作为演示,你也可以改为使用更合适的 CSS 选择器。

完整规则

 var DoubanBookTask = &collect.Task{ 
    Property: collect.Property{ 
    Name: "douban_book_list", WaitTime: 1 * time.Second, MaxDepth: 5, Cookie: "xxx" }, Rule: collect.RuleTree{ 
    Root: func() ([]*collect.Request, error) { 
    roots := []*collect.Request{ 
    &collect.Request{ 
    Priority: 1, Url: "<https://book.douban.com>", Method: "GET", RuleName: "数据tag", }, } return roots, nil }, Trunk: map[string]*collect.Rule{ 
    "数据tag": &collect.Rule{ 
   ParseFunc: ParseTag}, "书籍列表": &collect.Rule{ 
   ParseFunc: ParseBookList}, "书籍简介": &collect.Rule{ 
    ItemFields: []string{ 
    "书名", "作者", "页数", "出版社", "得分", "价格", "简介", }, ParseFunc: ParseBookDetail, }, }, }, } 

存储到MySQL

数据抽象

这里将数据抽象成DataCell, 其key定义如下

  • Task: 存储当前任务名
  • Rule: 存储当前的规则名
  • Url: 存储当前网址
  • Time: 存储当前时间
  • Data: 存储当前核心数据,即书籍详细信息
    • Data对应的数据结构又是一个哈希表 map[string]interface{}。
    • 在这个哈希表中,Key 为“书名”“评分”等字段名,Value 为字段对应的值。
    • Data 对应的 Value 不一定需要是 map[string]interface{},只要我们在后面能够灵活地处理不同的类型就可以了。
type DataCell struct { 
    Data map[string]interface{ 
   } } 

输出方法

func (c *Context) Output(data interface{ 
   }) *collector.DataCell { 
    res := &collector.DataCell{ 
   } res.Data = make(map[string]interface{ 
   }) res.Data["Task"] = c.Req.Task.Name res.Data["Rule"] = c.Req.RuleName res.Data["Data"] = data res.Data["Url"] = c.Req.Url res.Data["Time"] = time.Now().Format("2006-01-02 15:04:05") return res } 

数据存储

然后在 HandleResult 方法中对解析后的数据进行存储。

循环遍历 Items,判断其中的数据类型,如果数据类型为 DataCell,我们就要用专门的存储引擎将这些数据存储起来。(存储引擎是和每一个爬虫任务绑定在一起的,不同的爬虫任务可能会有不同的存储引擎。)

func (s *Crawler) HandleResult() { 
    for { 
    select { 
    case result := <-s.out: for _, item := range result.Items { 
    switch d := item.(type) { 
    case *collector.DataCell: name := d.GetTaskName() task := Store.Hash[name] task.Storage.Save(d) } s.Logger.Sugar().Info("get result: ", item) } } } } 

这里选择使用比较常见的 MySQL 数据库作为这个示例的存储引擎。

创建了一个接口 Storage 作为数据存储的接口,Storage 中包含了 Save 方法,任何实现了 Save 方法的后端引擎都可以存储数据。

type Storage interface { 
    Save(datas ...*DataCell) error } 

不过我们还需要完成一轮抽象,因为后端引擎会处理的事务比较繁琐,它不仅仅包含了存储,还包含了缓存、对表头的拼接、数据的处理等。所以,我们要创建一个更加底层的模块,只进行数据的存储。

这个底层抽象的好处在于,我们可以比较灵活地替换底层的存储模块,我在这个例子中使用了原生的 MySQL 语句来与数据库交互。你也可以使用 Xorm 与 Gorm 这样的库来操作数据库。

新建一个文件夹 mysqldb,设置操作数据库的接口 DBer,里面的两个核心函数分别是 CreateTable(创建表)以及 Insert(插入数据)。

type DBer interface { 
    CreateTable(t TableData) error //TableData 包含了表的数据 Insert(t TableData) error } type Field struct { 
    Title string Type string } type TableData struct { 
    TableName string // 表名 ColumnNames []Field // 字段名和字段的属性 Args []interface{ 
   } // 数据 DataCount int // 插入数据的数量 AutoKey bool // 标识是否为表创建自增主键 } 

下面这段代码,我们使用 option 模式生成了 SqlDB 结构体,实现了 DBer 接口。Sqldb.OpenDB 方法用于与数据库建立连接,需要从外部传入远程 MySQL 数据库的连接地址。

 type MySQLDb struct { 
    options db *sql.DB } func New(opts ...Option) (*MySQLDb, error) { 
    options := defaultOptions for _, opt := range opts { 
    opt(&options) } d := &MySQLDb{ 
   } d.options = options if err := d.OpenDB(); err != nil { 
    return nil, err } return d, nil } func (d *MySQLDb) OpenDB() error { 
    db, err := sql.Open("mysql", d.sqlUrl) if err != nil { 
    return err } db.SetMaxOpenConns(2048) db.SetMaxIdleConns(2048) if err = db.Ping(); err != nil { 
    return err } d.db = db return nil } // 创建表 func (d *MySQLDb) CreateTable(t TableData) error { 
    if len(t.ColumnNames) == 0 { 
    return errors.New("Column can not be empty") } sql := `CREATE TABLE IF NOT EXISTS ` + t.TableName + " (" if t.AutoKey { 
    sql += `id INT(12) NOT NULL PRIMARY KEY AUTO_INCREMENT,` } for _, t := range t.ColumnNames { 
    sql += t.Title + ` ` + t.Type + `,` } sql = sql[:len(sql)-1] + `) ENGINE=MyISAM DEFAULT CHARSET=utf8;` d.logger.Debug("crate table", zap.String("sql", sql)) _, err := d.db.Exec(sql) return err } // 插入操作 func (d *MySQLDb) Insert(t TableData) error { 
    if len(t.ColumnNames) == 0 { 
    return errors.New("empty column") } sql := `INSERT INTO ` + t.TableName + `(` for _, v := range t.ColumnNames { 
    sql += v.Title + "," } sql = sql[:len(sql)-1] + `) VALUES ` blank := ",(" + strings.Repeat(",?", len(t.ColumnNames))[1:] + ")" sql += strings.Repeat(blank, t.DataCount)[1:] + `;` d.logger.Debug("insert table", zap.String("sql", sql)) _, err := d.db.Exec(sql, t.Args...) return err } 

存储引擎实现

package sqlstorage import ( "encoding/json" "github.com/funbinary/crawler/collector" "github.com/funbinary/crawler/engine" "github.com/funbinary/crawler/mysqldb" "go.uber.org/zap" ) // 实现 Storage 接口的实现 type MySQLStore struct { 
    dataDocker []*collector.DataCell //分批输出结果缓存 columnNames []mysqldb.Field // 标题字段 db mysqldb.DBer Table map[string]struct{ 
   } options // 选项 } func New(opts ...Option) (*MySQLStore, error) { 
    options := defaultOptions for _, opt := range opts { 
    opt(&options) } s := &MySQLStore{ 
   } s.options = options s.Table = make(map[string]struct{ 
   }) var err error s.db, err = mysqldb.New( mysqldb.WithConnUrl(s.sqlUrl), mysqldb.WithLogger(s.logger), ) if err != nil { 
    return nil, err } return s, nil } func (s *MySQLStore) Save(dataCells ...*collector.DataCell) error { 
    // 循环遍历要存储的 DataCell,并判断当前 DataCell 对应的数据库表是否已经被创建。 for _, cell := range dataCells { 
    name := cell.GetTableName() if _, ok := s.Table[name]; !ok { 
    // 创建表 columnNames := getFields(cell) err := s.db.CreateTable(mysqldb.TableData{ 
    TableName: name, ColumnNames: columnNames, AutoKey: true, }) if err != nil { 
    s.logger.Error("create table falied", zap.Error(err)) } s.Table[name] = struct{ 
   }{ 
   } } // 如果当前的数据小于 s.BatchCount,则将数据放入到缓存中直接返回(使用缓冲区批量插入数据库可以提高程序的性能)。 if len(s.dataDocker) >= s.BatchCount { 
    s.Flush() } // 如果缓冲区已经满了,则调用 SqlStore.Flush() 方法批量插入数据。 s.dataDocker = append(s.dataDocker, cell) } return nil } // getFields 用于获取当前数据的表字段与字段类型,这是从采集规则节点的 ItemFields 数组中获得的。 // 为什么不直接用 DataCell 中 Data 对应的哈希表中的 Key 生成字段名呢? // 这一方面是因为它的速度太慢,另外一方面是因为 Go 中的哈希表在遍历时的顺序是随机的,而生成的字段列表需要顺序固定。 func getFields(cell *collector.DataCell) []mysqldb.Field { 
    taskName := cell.Data["Task"].(string) ruleName := cell.Data["Rule"].(string) fields := engine.GetFields(taskName, ruleName) var columnNames []mysqldb.Field for _, field := range fields { 
    columnNames = append(columnNames, mysqldb.Field{ 
    Title: field, Type: "MEDIUMTEXT", }) } columnNames = append(columnNames, mysqldb.Field{ 
   Title: "Url", Type: "VARCHAR(255)"}, mysqldb.Field{ 
   Title: "Time", Type: "VARCHAR(255)"}, ) return columnNames } // Flush 核心是遍历缓冲区,解析每一个 DataCell 中的数据,将扩展后的字段值批量放入 args 参数中, // 并调用底层 DBer.Insert 方法批量插入数据 func (s *MySQLStore) Flush() error { 
    if len(s.dataDocker) == 0 { 
    return nil } args := make([]interface{ 
   }, 0) for _, datacell := range s.dataDocker { 
    ruleName := datacell.Data["Rule"].(string) taskName := datacell.Data["Task"].(string) fields := engine.GetFields(taskName, ruleName) data := datacell.Data["Data"].(map[string]interface{ 
   }) value := []string{ 
   } for _, field := range fields { 
    v := data[field] switch v.(type) { 
    case nil: value = append(value, "") case string: value = append(value, v.(string)) default: j, err := json.Marshal(v) if err != nil { 
    value = append(value, "") } else { 
    value = append(value, string(j)) } } } value = append(value, datacell.Data["Url"].(string), datacell.Data["Time"].(string)) for _, v := range value { 
    args = append(args, v) } } return s.db.Insert(mysqldb.TableData{ 
    TableName: s.dataDocker[0].GetTableName(), ColumnNames: getFields(s.dataDocker[0]), Args: args, DataCount: len(s.dataDocker), }) } 

存储引擎验证

接下来我们使用运行MySQL, 下面给出两种运行方式:

docker

docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD= mysql 

参数说明:

  • -d: 在run后面加上-d参数,则会创建一个守护式容器在后台运行(这样创建容器后不 会自动登录容器,如果只加-i -t 两个参数,创建后就会自动进去容器)。
  • -p: 表示端口映射,前者是宿主机端口,后者是容器内的映射端口。可以使用多个-p 做多个端口映射
  • -e: 为容器设置环境变量
    • MYSQL_ROOT_PASSWORD​ 是MySQL初始root密码, 此变量是必须的
    • MYSQL_DATABASE​ 此变量是可选的,允许您指定要在映像启动时创建的数据库的名称。如果提供了用户/密码(见下文),则该用户将被授予对此数据库的超级用户访问权限(对应于​GRANT ALL``​)。
    • MYSQL_USER​和​ MYSQL_PASSWORD​这些变量是可选的,与创建新用户和设置该用户的密码结合使用。此用户将被授予对变量指定的数据库的超级用户权限(见上文)。这两个变量都是创建用户所必需的。MYSQL_DATABASE
    • MYSQL_ALLOW_EMPTY_PASSWORD​ : 这是一个可选变量。设置为非空值(如 ),以允许使用 root 用户的空白密码启动容器。注意:除非您真的知道自己在做什么,否则不建议将此变量设置为 ,因为这会使您的 MySQL 实例完全不受保护,从而允许任何人获得完整的超级用户访问权限。
    • MYSQL_RANDOM_ROOT_PASSWORD​: 这是一个可选变量。设置为非空值,如 ,为 root 用户生成随机初始密码(使用 )。生成的根密码将打印到 stdout ()。
    • MYSQL_ONETIME_PASSWORD​ : 初始化完成后,将 root(而不是 !中指定的用户)用户设置为已过期,强制在首次登录时更改密码。任何非空值都将激活此设置。注意:此功能仅在MySQL 5.6 +上受支持。在MySQL 5.5上使用此选项将在初始化期间引发适当的错误。
    • MYSQL_INITDB_SKIP_TZINFO​​ : 默认情况下,入口点脚本会自动加载函数所需的时区数据。如果不需要,任何非空值都会禁用时区加载。CONVERT_TZ()

docker-compose

version: '3.1' services: db: image: mysql command: --default-authentication-plugin=mysql_native_password restart: always volumes: - D:\data\mysql:/var/lib/mysql ports: - 3306:3306 environment: MYSQL_ROOT_PASSWORD:  adminer: image: adminer restart: always ports: - 8080:8080 

使用Navicat查看

image

使用DataGrip查看

image

「此文章为4月Day8学习笔记,内容来源于极客时间《Go分布式爬虫实战》,强烈推荐该课程!/推荐该课程」

编程小号
上一篇 2024-12-10 08:42
下一篇 2024-12-10 08:36

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://sigusoft.com/datagrip/3226.html