다음 구조를 만드는게 목표입니다.
1.. 영화 데이터를 서빙하는 rating api가 5000번에 띄워져 있습니다.
2. airflow는 rating api로부터 정보를 가져오고, 가져온 데이터로 랭킹 데이터를 생성합니다.
chapter 8 경로에서 도커를 실행합니다.
docker-compose up -d --build
5000번에 rating api가 띄워져 있는지 확인해 보았습니다.
아직 5000번에 rating api가 띄워지지 않았군요.
rating api 띄우는 중 에러가 발생했습니다.
Traceback (most recent call last):
File "/app.py", line 6, in <module>
from flask import Flask, jsonify, request
File "/usr/local/lib/python3.8/site-packages/flask/__init__.py", line 14, in <module>
from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.8/site-packages/jinja2/__init__.py)
https://stackoverflow.com/questions/71718167/importerror-cannot-import-name-escape-from-jinja2
Flask 버전을 올리면 된다고 하네요.
관련한 버전들을 수정했습니다.
Flask==2.1.0
pandas==0.25.2
Flask-HTTPAuth==4.6.0
click==8.0.4
서버 실행에 성공했습니다.
ratings api도 잘 호출됩니다.
날짜로 조회하는 것도 됩니다.
/ratings?start_date=2022-10-01&end_date=2022-01-02
다만 최신 날짜로 쌓인 데이터는 없는 것 같네요.
8.1 fetch +rating DAG 만들기
이제 AIRFLOW에서 주기적으로 rating api에서 fetch할 수 있도록, 코드를 만들겠습니다.
사실 이미 github에 이미 구현되어 있습니다.
https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/01_python.py
def _get_ratings(start_date, end_date, batch_size=100):
session, base_url = _get_session()
yield from _get_with_pagination(
session=session,
url=base_url + "/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
yield from 구문을 처음 보아서 밑에 적었습니다.
https://docs.python.org/3/reference/expressions.html#yieldexpr
asynchronous하게 데이터를 가져오고 싶을 때 쓰는 것이라고 이해했습니다.
실행해 보겠습니다.
output 결과를 확인하겠습니다.
그런데, airflow scheduler나 webserver container에 들어가도 csv output 파일을 찾지 못했습니다.
(/data/python/ratings/{{ds}}.json)
/var/lib/docker/volumes 경로로 들어가서 ,output 파일을 확인했습니다.
ratings와 rankings 파일은 잘 들어갔습니다.
rankings 파일:
hook 만들기
두 번째 실습은 hook을 만드는 것입니다.
hook도 이미 구현되어 있습니다. (랭킹 로직은 구현되어 있지 않습니다.)
https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/02_hook.py
connection을 재사용한다는 점이 지난 python DAG와 가장 큰 차이인 것 같습니다.
hook 단어가 무엇인지 궁금해 검색했습니다.
hook is a place and usually an interface provided in packaged code that allows a programmer to insert customized programming
개발자가 customizing할 수 있는 코드를 의미로 이해했습니다.
custom operator
아래 코드에 구현되어 있습니다.
output path를 설정하는 곳을 두 곳에서 한 곳으로 줄였군요.
hook과 마찬가지로 랭킹 로직은 구현되어 있지 않습니다.
1 Comment
김학건
개인적 궁금증
Q. hook, operator, sensor 의 수정이 필요하면 매번, 빌드하고 배포해야할까?
Q. k8s에서 배포한다면 무거워질텐데 이미지가, 관련 패키지 설치하면서 맞는 방향일까?
Q. Operator의 경우 어디까지 로직을 넣어야할까?
Q. op_kwargs와 template_dict는 차이가 뭘까?