Scrapy 是 Python 開發的一個快速,高層次的屏幕抓取和 web 抓取框架,用於抓取 web 站點並從頁面中提取結構化的數據。 Scrapy 用途廣泛,可以用於數據挖掘、監測和自動化測試。本文作者 Erdem İşbilen 為我們演示瞭如何使用 Python 和 Scrapy 怎樣在半個小時內對 10 個在線商店抓取信息。有了 Python 和 Scrapy,我們就可以完成大量的工作,而不需要自己費大力氣去開發。
獲取啟動 App 項目所需的源數據是一步。即便你是全棧開發人員,希望開發一款出色的 Web 應用程序,並能全身心投入到項目中。在編寫代碼之前,仍然需要一個與領域相關的數據集。這是因為現代應用程序會同時或成批處理大量數據,以便為其用戶提供價值。本文,我將解釋生成這樣一個數據集的工作流程。你將會看到,我在沒有任何人工干預的情況下是如何對許多網站進行自動網頁抓取的。
我的目標是為價格比較網絡應用程序生成一個數據集。我將使用的產品類別以手提袋為例。對於這樣的應用,應該每天從不同的在線商店那裡收集手提包的產品信息和價格信息。儘管有些在線商店提供了 API 讓你訪問所需的信息,但並非所有在線商店都會這麼做。所以,網頁抓取不可避免。
在本文的示例中,我將使用 Python 和 Sparky 為 10 個不同的在線商店生成網絡蜘蛛(Web spider)。然後,我將使用 Apache Airflow 自動化這一過程,這樣就不需要人工干預來定期執行整個過程。
源代碼和現場演示 Web 應用程序
你可以在GitHub 倉庫 找到所有相關的源代碼,也可以訪問在線 Web 應用程序,使用的是網頁抓取項目提供的數據。
在開始任何網頁抓取項目之前,必須先定義哪些網站將包含在項目中。我決定抓取 10 個網站,這些網站是土耳其手提包類別中訪問量最大的在線商店。
步驟 1:安裝 Scrapy 並設置項目文件夾
在創建 Scrapy 蜘蛛之前,必須將 Scrapy 安裝到計算機中,並生成 Scrapy 項目。請查看下面的帖子了解更多的信息。
Fuel Up the Deep Learning: Custom Dataset Creation with Web Scraping(推動深度學習:使用網頁抓取創建自定義數據集)
#安装 Scrapy
$ pip install scrapy
#安装用于下载产品图片的图像
$ pip install image
#使用 Scrapy 开始网页抓取项目
$ scrapy startproject fashionWebScraping
$ cd fashionWebScraping
$ ls
#创建项目文件夹,如下所述
$ mkdir csvFiles
$ mkdir images_scraped
$ mkdir jsonFiles
$ mkdir utilityScripts
項目文件夾和文件
項目的文件夾結構
我在本地計算機上創建了一個文件夾結構,將項目文件整齊地放入不同文件夾。csvFiles
文件夾包含了每個被抓取的網站的 CSV 文件。網絡蜘蛛將從那些 CSV 文件中讀取“起始 URL”來啟動網頁抓取,因為我不想在網絡蜘蛛中對它們進行硬編碼。
fashionWebScraping
文件夾包含 Scrapy 蜘蛛和助手腳本,比如 settings.py
、item.py
和 pipelines.py
。我們必須修改其中一些 Scrapy 助手腳本,才能成功執行網頁抓取過程。
抓取的產品圖像將保存在 images_scraped
文件夾中。
在網頁抓取過程中,所有的產品信息,如價格、名稱、產品鏈接和圖像鏈接都將存儲在 jsonFiles
文件夾中的 JSON 文件中。
deldub.py
用於在網頁抓取結束後,檢測並刪除 JSON 文件中重複的產品信息。jsonPrep.py
是另一個實用程序腳本,用於在網頁抓取結束後,檢測並刪除 JSON 文件中的空行項。deleteFiles.py
可刪除在上一次網頁抓取會話中生成的所有 JSON 文件。jsonToes.py
通過讀取 JSON 文件,在遠程位置填充 ElasticSearch 集群。這是提供實施全文搜索體驗所必需的。sitemap_gen.py
用於生成涵蓋所有產品鏈接的站點地圖。
步驟 2:理解特定網站的 URL 結構並為起始 URL 填充 CSV 文件
創建項目文件夾後,下一步就是使用我們要抓取的每個網站的起始 URL 來填充 CSV 文件。幾乎所有的電子商務網站都提供分頁功能,以便通過產品列表為用戶導航。每次導航到下一頁時,URL 中的 page
參數都會增加。請參見下面的示例 URL,其中使用了 page
參數。
https://www.derimod.com.tr/kadin-canta-aksesuar/?page=1
我將使用 {}
佔位符,這樣就可以通過增加 page
的值來對 URL 進行迭代。我還將使用 CSV 文件中的 gender
列來定義特定 URL 的性別類別。
因此,最終的 CSV 文件看起來如下圖所示:
同樣的原則,也適用於項目中的其他網站,可以在我的 GitHub 倉庫中找到已填充的 CSV 文件。
步驟 3:修改 items.py
和 settings.py
文件
要開始網頁抓取,我們必須修改 items.py
來定義用於存儲抓取數據的 item objects
。
為了定義通用輸出數據格式,Scrapy 提供了
Item
類。Item
對像是適用於收集抓取的數據的簡單容器。它們提供了一個類似字典的 API,有一個方便的語法來生命它們可用的字段。引自 scrapy.org
# fashionWebScraping 文件夹中的 items.py
import scrapy
from scrapy.item import Item, Field
class FashionwebscrapingItem(scrapy.Item):
#与产品相关的项,如 Id、名称、价格等
gender=Field()
productId=Field()
productName=Field()
priceOriginal=Field()
priceSale=Field()
#要存储链接的项
imageLink = Field()
productLink=Field()
#公司名称项
company = Field()
pass
class ImgData(Item):
#用于下载产品图像的图像管道项
image_urls=scrapy.Field()
images=scrapy.Field()
然後,我們必須修改 settings.py
。這是自定義網絡蜘蛛的圖像管道和行為所必需的。
通過 Scrapy 設置,你可以自定義所有 Scrapy 組件的行為,包括核心、擴展、管道和網絡蜘蛛本身。
引自 scrapy.org
# fashionWebScraping 文件夹中的 settings.py
# fashionWebScraping 项目的 Scrapy 设置
# 为简单起见,此文件仅包含被认为重要或常用的设置。你可以参考文档找到更多的设置:
# https://doc.scrapy.org/en/latest/topics/settings.html
# https://doc.scrapy.org/en/latest/topics/downloader-middleware.html
# https://doc.scrapy.org/en/latest/topics/spider-middleware.html
BOT_NAME = 'fashionWebScraping'
SPIDER_MODULES = ('fashionWebScraping.spiders')
NEWSPIDER_MODULE = 'fashionWebScraping.spiders'
# 通过在用户代理上标识自己(和你的网站),负责任的抓取。
USER_AGENT = 'fashionWebScraping'
# 遵守 rebots.txt 规则
ROBOTSTXT_OBEY = True
# 参阅 https://doc.scrapy.org/en/latest/topics/settings.html
# 下载延迟
# 另请参阅 autothrottle 的设置和文档
# 这样可以避免对服务器造成太大的压力
DOWNLOAD_DELAY = 1
# 重写默认的请求报头:
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'tr',
}
# 配置项目管道
# 参阅 https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {'scrapy.pipelines.images.ImagesPipeline': 1}
IMAGES_STORE = '/Users/erdemisbilen/Angular/fashionWebScraping/images_scraped'
item.py
和 settings.py
對我們項目中的所有網絡蜘蛛都有效。
步驟 4:構建網絡蜘蛛
Scrapy Spiders 是用於定義如何抓取某個站點(或一組站點)的類,包括如何執行抓取(即跟隨鏈接),以及如何從其頁面中提取結構化數據(即抓取項目)。換言之,Spiders 是你定義特定站點(或者在某些情況下為一組站點)的網絡蜘蛛和解析頁面的自定義行為的地方。
引自 scrapy.org
fashionWebScraping $ scrapy genspider fashionBOYNER boyner.com
在模塊中使用
basic
模板創建網絡蜘蛛fashionBOYNER
:fashionWebScraping.spiders.fashionBOYNER
上面的 shell 命令創建一個空的 spider 文件。讓我們將這些代碼寫入到我們的 fashionBOYNER.py 文件中:
# fashionWebScraping/Spiders 文件夹中的 'fashionBOYNER.py' 文件
# 导入 scrapy 和 scrapy 项
import scrapy
from fashionWebScraping.items import FashionwebscrapingItem
from fashionWebScraping.items import ImgData
from scrapy.http import Request
# 从 csv 文件读取
import csv
class FashionboynerSpider(scrapy.Spider):
name = 'fashionBOYNER'
allowed_domains = ('BOYNER.com')
start_urls = ('http://BOYNER.com/')
# 此函数通过跟踪 csv 文件中的起始 URL 来帮助我们抓取网站的全部内容
def start_requests(self):
# 从 csv 文件读取主类别 URL
with open ("/Users/erdemisbilen/Angular/fashionWebScraping/
csvFiles/SpiderMainCategoryLinksBOYNER.csv", "rU") as f:
reader=csv.DictReader(f)
for row in reader:
url=row('url')
# 以增量方式更改 page 值来浏览产品列表
# 你可以根据最大值来调整产品数量的范围值,默认是抓取 30 个网页
link_urls = (url.format(i) for i in range(1,30))
for link_url in link_urls:
print(link_url)
# 将包含产品的每个链接传递给带有性别元数据的 parse_ product_pages 函数
request=Request(link_url, callback=self.parse_product_pages,
meta={'gender': row('gender')})
yield request
# 此函数在 xpath 的帮助下抓取网页数据
def parse_product_pages(self,response):
item=FashionwebscrapingItem()
# 获取 HTML 块,其中列出了所有的产品。
# HTML 元素,具有"product-list-item" 类名
content=response.xpath('//div(starts-with(@class,"product-list-
item"))')
# 循环遍历内容中的每个元素
for product_content in content:
image_urls = ()
# 获取产品详细信息并填充项
item('productId')=product_content.xpath('.//a/@data
-id').extract_first()
item('productName')=product_content.xpath('.//img/@title').
extract_first()
item('priceSale')=product_content.xpath('.//ins(@class=
"price-payable")/text()').extract_first()
item('priceOriginal')=product_content.xpath('.//del(@class=
"price-psfx")/text()').extract_first()
if item('priceOriginal')==None:
item('priceOriginal')=item('priceSale')
item('imageLink')=product_content.xpath('.//img/
@data-original').extract_first()
item('productLink')="https://www.boyner.com.tr"+
product_content.xpath('.//a/@href').extract_first()
image_urls.append(item('imageLink'))
item('company')="BOYNER"
item('gender')=response.meta('gender')
if item('productId')==None:
break
yield (item)
# 下载 image_urls 中包含的图像
yield ImgData(image_urls=image_urls)
def parse(self, response):
pass
我們的 spider 類包含兩個函數,即 start_requests
和 parse_product_pages
。
在 start_requests
函數中,我們從已經生成的特定 CSV 文件中讀取起始 URL 信息。然後,我們對占位符 {}
進行迭代,將產品頁面的 URL 傳遞給 parse_product_pages
函數。
我們還可以使用 meta={‘gender’: row(‘gender’)}
參數將 gender
元數據傳遞給 Request
方法中的 parse_product_pages
函數。
在 parse_product_pages
函數中,我們執行實際的網頁抓取,並用抓取的數據填充 Scrapy 項。
我使用 Xpath 來定位網頁上包含產品信息的 HTML 部分。
下面的第一個 XPath 表達式從當前正在抓取的網頁中提取整個產品列表。所有必需的產品信息都包含在內容的 div
元素中。
# // 从当前节点中选择与所选内容匹配的节点,无论它们在何处
#//'//div(starts-with(@class,"product-list-item"))'选择所有的 div 元素,这些元素具有类值 start
content = response.xpath('//div(starts-with(@class,"product-list-item"))')
我們需要遍歷 content
以獲取各個產品,並將它們存儲在 Scrapy 項中。借助 XPath 表達式,我們可以很容易地在 content
中找到所需的 HTML 元素。
# 遍历内容中的每个 元素
for product_content in content:
image_urls = ()
# 获取产品详细信息并填充项
# ('.//a/@data-id') 提取 product_content 中的 元素的 'data-id' 值
item('productId')=product_content.xpath('.//a/@data
-id').extract_first()
# ('.//img/@title') 提取 product_content 中的
元素的 `title` 值
item('productName')=product_content.xpath('.//img/@title').
extract_first()
# ('.//ins(@class= "price-payable")/text()') 提取 元素的文本值,该元素具有 product_content 中的 `price-payable` 类别属性。
item('priceSale')=product_content.xpath('.//ins(@class=
"price-payable")/text()').extract_first()
# ('.//del(@class="price-psfx")/text()') 提取 元素的文本值,该元素具有 product_content 中的 `price-psfx` 类属性。
item('priceOriginal')=product_content.xpath('.//del(@class=
"price-psfx")/text()').extract_first()
if item('priceOriginal')==None:
item('priceOriginal')=item('priceSale')
# ('.//img/@data-original') 提取 product_content 中的
元素的 `data-original` 值
item('imageLink')=product_content.xpath('.//img/
@data-original').extract_first()
# ('.//a/@href') 提取 product_content 中的 元素的 `href`值
item('productLink')="https://www.boyner.com.tr"+
product_content.xpath('.//a/@href').extract_first()
# 将产品图片链接分配到图像管道定义的 `image_urls` 中
image_urls.append(item('imageLink'))
item('company')="BOYNER"
item('gender')=response.meta('gender')
if item('productId')==None:
break
yield (item)
**# 下载 image_urls 中包含的图像
yield ImgData(image_urls=image_urls)
同樣的原則也適用於其他網站。你可以在我的 GitHub 倉庫中看到所有 10 個網絡蜘蛛的代碼。
步驟 5:運行網絡蜘蛛並將抓取的數據存儲在 JSON 文件中
在抓取過程中,每個產品項都存儲在一個 JSON 文件中。每個網站都有一個特定的 JSON 文件,該文件在每次網絡蜘蛛抓取時都填充了數據。
fashionWebScraping $ scrapy crawl -o rawdata_BOYNER.json -t jsonlines fashionBOYNER
與 JSON 格式相比,使用 jsonlines 格式的內存效率要高得多,特別是當你在一個會話中抓取了大量網頁數據時。
注意,JSON 文件名以 rawdata
開頭,這表明下一步是在我們的應用程序中使用抓取的原始數據之前,檢查並驗證它們。
步驟 6:清理並驗證 JSON 文件中的抓取數據
在網頁抓取過程結束之後,你可能需要先從 JSON 文件中刪除一些行項,然後才能在應用程序中使用它們。
JSON 文件中可能有那些帶有空字段或重複值的行項。這兩種情況,都需要一個修正過程,我使用的是 jsonPrep.py
和 deldub.py
來處理這兩種情況。
jsonPrep.py
查找帶有空值的行項,並在檢測到它們時將其刪除。你可以查看下面的帶有解釋的示例代碼:
# fashionWebScraping/utilityScripts 文件夹中的 `jsonPrep.py`文件
import json
import sys
from collections import OrderedDict
import csv
import os
# 从 jsonFiles.csv 文件中读取需要验证的所有 json 文件的名称和位置
with open("/Users/erdemisbilen/Angular/fashionWebScraping/csvFiles/ jsonFiles.csv", "rU") as f:reader=csv.DictReader(f)
# 迭代 jsonFiles.csv 中列出的 json 文件
for row in reader:
# 从 jsonFiles.csv 文件中读取 jsonFile_raw 列
jsonFile=row('jsonFile_raw')
# 打开 the jsonFile
with open(jsonFile) as json_file:
data = ()
i = 0
seen = OrderedDict()
# 对 json 文件中的行进行迭代
for d in json_file:
seen = json.loads(d)
# 如果产品 Id 为空,则不包括行项
try:
if seen("productId") != None:
for key, value in seen.items():
print("ok")
i = i + 1
data.append(json.loads(d))
except KeyError:
print("nok")
print (i)
baseFileName=os.path.splitext(jsonFile)(0)
# 通过从 `file_name_prep` 行读取 filename,将结果写成 json 文件
with open('/Users/erdemisbilen/Angular/fashionWebScraping/
jsonFiles/'+row('file_name_prep'), 'w') as out:
json.dump(data, out)
在刪除空行項之後,將結果保存到 jsonFiles
項目文件夾中,文件名以 prepdata
開頭。
deldub.py
查找重複的行項,並在檢測到時將其刪除。你可以查看下面的帶有解釋的示例代碼:
# fashionWebScraping/utilityScripts 文件夹中的 'deldub.py' 文件
import json
import sys
from collections import OrderedDict
import csv
import os
# 从 jsonFiles.csv 文件中读取所有 json 文件的名称和位置,需要验证重复的行项
with open("/Users/erdemisbilen/Angular/fashionWebScraping/csvFiles/ jsonFiles.csv", newline=None) as f:
reader=csv.DictReader(f)
# 迭代 jsonFiles.csv 中列出的 json 文件
for row in reader:
# 从 jsonFile.csv 文件中读取 jsonFile_raw 列
jsonFile=row('jsonFile_prep')
# 打开 jsonFile
with open(jsonFile) as json_file:
data = json.load(json_file)
seen = OrderedDict()
dubs = OrderedDict()
# 对 json 文件的行进行迭代
for d in data:
oid = d("productId")
# 如果产品 Id 具有重复值,则不包括该项
if oid not in seen:
seen(oid) = d
else:
dubs(oid)=d
baseFileName=os.path.splitext(jsonFile)(0)
# 通过从 `file_name_final` 读取 filename,将结果写成 json 文件
with open('/Users/erdemisbilen/Angular/fashionWebScraping/
jsonFiles/'+row('file_name_final'), 'w') as out:
json.dump(list(seen.values()), out)
with open('/Users/erdemisbilen/Angular/fashionWebScraping/
jsonFiles/'+'DELETED'+row('file_name_final'), 'w') as out:
json.dump(list(dubs.values()), out)
在刪除重複的行項之後,結果以finaldata
開頭的文件名保存到 jsonFiles
項目文件夾中。
使用 Apache Airflow 實現自動化
一旦我們定義了網頁抓取過程後,就可以進入工作流程自動化了。我將使用 Apache Airflow,這是 Airbnb 開發的,基於 Python 的工作流自動化工具。
我將提供安裝和配置 Apache Airflow 的終端命令,你可以參考我下面的帖子進一步了解更多的細節:
My Deep Learning Journey: From Experimentation to Production(我的深度學習之旅:從實驗到生產)
$ python3 --version
Python 3.7.3$ virtualenv --version
15.2.0$ cd /path/to/my/airflow/workspace
$ virtualenv -p `which python3` venv
$ source venv/bin/activate
(venv) $ pip install apache-airflow
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=`pwd`/airflow_home
(venv) $ airflow initdb
(venv) $ airflow webserver
創建 DAG 文件
在 Airflow 中,DAG 或有向無環圖(Directed Acyclic Graph)是你希望運行的所有任務的集合,其組織方式反映了它們之間的關係和依賴關係。
例如,一個簡單的 DAG 可以由三個任務組成:A、B 和 C。可以說,A 必須先成功運行,B 才能運行,但是 C 可以隨時運行。可以說任務 A 在 5 分鐘後超時,而任務 B 在失敗的情況下,最多可以重啟 5 次。也可以說,工作流程將在每晚 10 點運行,但不應在某個特定日期才開始。
DAG 是在 Python 文件中定義的,用於組織任務流。我們不會在 DAG 文件中定義實際的任務。
讓我們創建一個 DAG 文件夾和一個空的 Python 文件,開始用 Python 代碼定義我們的工作流程。
(venv) $ mkdir dags
在一個 DAG 文件中,有幾個由 Airflow 提供的操作符來描述任務。我在下面列出了幾個常用的。
BashOperator
:執行 bash 命令。
PythonOperator
:調用任意 Python 函數。
-
EmailOperator
:發送一封電子郵件。
SimleHttpOperator
:發送一個 HTTP 請求。
Sensor
:等待特定的時間、文件、數據庫行、S3 鍵等等。
目前我計劃只使用 BashOperator
,因為我將使用 Python 腳本完成所有任務。
因為我將使用 BashOperator
,所以最好有一個 bash 腳本,其中包含了一個特定任務的所有命令,以簡化 DAG 文件。
根據本教程,我為每個任務生成了 bash 腳本。你可以在我的 GitHub 倉庫 中找到它們。
然後,我可以使用創建的 bash 命令編寫我的 DAG 文件,如下所示。使用下面的配置,我的任務將由 Airflow 每天安排和執行。你可以根據需要,更改 DAG 文件中的開始日期或計劃間隔。你還可以使用本地執行器或 celery 執行器並行運行任務實例。由於我使用的是最原始的執行器——順序執行器,因此,我所有的任務實例都將按順序工作。
# Airflow dag 文件夹中的'fashionsearch_dag.py'文件
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 11, 23),
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(dag_id='fashionsearch_dag', default_args=default_args, schedule_interval=timedelta(days=1))
#此任务将删除在以前抓取会话中生成的所有 json 文件
t1 = BashOperator(
task_id='delete_json_files',
bash_command='run_delete_files',
dag=dag)
# 此任务运行 www.boyner.com 的网络蜘蛛,并使用所抓取的数据填充相关的 json 文件
t2 = BashOperator(
task_id='boyner_spider',
bash_command='run_boyner_spider',
dag=dag)
# 此任务运行 www.derimod.con 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t3 = BashOperator(
task_id='derimod_spider',
bash_command='run_derimod_spider',
dag=dag)
# 此任务运行 www.hepsiburada.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t4 = BashOperator(
task_id='hepsiburada_spider',
bash_command='run_hepsiburada_spider',
dag=dag)
# 此任务运行 www.hm.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t5 = BashOperator(
task_id='hm_spider',
bash_command='run_hm_spider',
dag=dag)
# 此任务运行 www.koton.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t6 = BashOperator(
task_id='koton_spider',
bash_command='run_koton_spider',
dag=dag)
# 此任务运行 www.lcwaikiki.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t7 = BashOperator(
task_id='lcwaikiki_spider',
bash_command='run_lcwaikiki_spider',
dag=dag)
# 此任务运行 www.matmazel.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t8 = BashOperator(
task_id='matmazel_spider',
bash_command='run_matmazel_spider',
dag=dag)
# 此任务运行 www.modanisa.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t9 = BashOperator(
task_id='modanisa_spider',
bash_command='run_modanisa_spider',
dag=dag)
# 此任务运行 www.morhipo.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t10 = BashOperator(
task_id='morhipo_spider',
bash_command='run_morhipo_spider',
dag=dag)
# 此任务运行 www.mudo.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t11 = BashOperator(
task_id='mudo_spider',
bash_command='run_mudo_spider',
dag=dag)
# 此任务运行 www.trendyol.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t12 = BashOperator(
task_id='trendyol_spider',
bash_command='run_trendyol_spider',
dag=dag)
# 此任务运行 www.yargici.com 的网络蜘蛛,并使用抓取的数据填充相关的 json 文件
t13 = BashOperator(
task_id='yargici_spider',
bash_command='run_yargici_spider',
dag=dag)
# 此任务检查并删除 json 文件中的空行项
t14 = BashOperator(
task_id='prep_jsons',
bash_command='run_prep_jsons',
dag=dag)
# 此任务检查并删除 json 文件中的重复行项
t15 = BashOperator(
task_id='delete_dublicate_lines',
bash_command='run_del_dub_lines',
dag=dag)
# 此任务使用 JSON 文件中的数据填充远程 ES 集群
t16 = BashOperator(
task_id='json_to_elasticsearch',
bash_command='run_json_to_es',
dag=dag)
# 对于顺序执行器,所有任务都依赖于前一个任务
# 不可能并行执行任务
# 至少在执行并行任务时使用本地执行器
t1 >> t2 >> t3 >> t4 >> t5 >> t6 >> t7 >> t8 >> t9 >> t10 >> t11 >> t12 >> t13 >> t14 >> t15 >> t16
要啟動 DAG 工作流程,我們需要運行 Airflow Scheduler。這將使用 airflow.cfg
文件中指定的配置執行計劃程序。調度器(Scheduler)監視位於 dags
文件夾中的每個 DAG 中的每個任務,如果任務的依賴關係已經滿足,就會觸發任務的執行。
(venv) $ airflow scheduler
一旦運行了 Airflow 調度器,我們就可以通過瀏覽器訪問 http://0.0.0.0:8080 來查看任務的狀態。 Airflow 提供了一個用戶界面,我們可以在其中查看並跟踪計劃的 DAG。

AirFlow DAG 圖形視圖

AirFlow DAG 樹狀視圖
結論
本文演示了我從頭到尾實現網頁抓取的整個工作流。我希望本文能夠讓你掌握網頁抓取和工作流自動化的基礎知識。
作者介紹:
Erdem İşbilen,汽車工程師、機械工程師。有豐富的項目質量團隊領導經驗,自 2001 年以來,在汽車行業工作,有著豐富的項目質量團隊領導經驗,擁有海峽大學(Bogazici University)汽車工程專業的理學碩士學位。愛好深度學習和機器學習。
原文鏈接: