다음 구조를 만드는게 목표입니다.


1.. 영화 데이터를 서빙하는 rating api가 5000번에 띄워져 있습니다.

2. airflow는 rating api로부터 정보를 가져오고, 가져온 데이터로 랭킹 데이터를 생성합니다.


chapter 8 경로에서 도커를 실행합니다.

docker-compose up -d --build


5000번에 rating api가 띄워져 있는지 확인해 보았습니다.

아직 5000번에 rating api가 띄워지지 않았군요.


rating api 띄우는 중 에러가 발생했습니다.

Traceback (most recent call last):
  File "/app.py", line 6, in <module>
    from flask import Flask, jsonify, request
  File "/usr/local/lib/python3.8/site-packages/flask/__init__.py", line 14, in <module>
    from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.8/site-packages/jinja2/__init__.py)

https://stackoverflow.com/questions/71718167/importerror-cannot-import-name-escape-from-jinja2

Flask 버전을 올리면 된다고 하네요.

관련한 버전들을 수정했습니다.

Flask==2.1.0
pandas==0.25.2
Flask-HTTPAuth==4.6.0
click==8.0.4




서버 실행에 성공했습니다.


ratings api도 잘 호출됩니다.


날짜로 조회하는 것도 됩니다.

/ratings?start_date=2022-10-01&end_date=2022-01-02

다만 최신 날짜로 쌓인 데이터는 없는 것 같네요.

8.1 fetch +rating DAG 만들기

이제 AIRFLOW에서 주기적으로 rating api에서 fetch할 수 있도록, 코드를 만들겠습니다.

사실 이미 github에 이미 구현되어 있습니다.

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/01_python.py

def _get_ratings(start_date, end_date, batch_size=100):
    session, base_url = _get_session()

    yield from _get_with_pagination(
        session=session,
        url=base_url + "/ratings",
        params={"start_date": start_date, "end_date": end_date},
        batch_size=batch_size,
    )

yield from 구문을 처음 보아서 밑에 적었습니다.

https://docs.python.org/3/reference/expressions.html#yieldexpr

asynchronous하게 데이터를 가져오고 싶을 때 쓰는 것이라고 이해했습니다.



실행해 보겠습니다.


output 결과를 확인하겠습니다.

그런데, airflow scheduler나 webserver container에 들어가도 csv output 파일을 찾지 못했습니다.

(/data/python/ratings/{{ds}}.json)


/var/lib/docker/volumes 경로로 들어가서 ,output 파일을 확인했습니다.

ratings와 rankings 파일은 잘 들어갔습니다.


rankings 파일:

hook 만들기

두 번째 실습은 hook을 만드는 것입니다.

hook도 이미 구현되어 있습니다. (랭킹 로직은 구현되어 있지 않습니다.)

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/02_hook.py

connection을 재사용한다는 점이 지난 python DAG와 가장 큰 차이인 것 같습니다.


hook 단어가 무엇인지 궁금해 검색했습니다.

hook is a place and usually an interface provided in packaged code that allows a programmer to insert customized programming

개발자가 customizing할 수 있는 코드를 의미로 이해했습니다.


custom operator

아래 코드에 구현되어 있습니다.

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/03_operator.py

output path를 설정하는 곳을 두 곳에서 한 곳으로 줄였군요.

hook과 마찬가지로 랭킹 로직은 구현되어 있지 않습니다.

sensor

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/40b52ba6fcda3203b194be9e1c2850135997215a/chapter08/dags/custom/sensors.py