Integration of Frontend, Backend, and DAGs#
This document explains how the user interacts with the application, from uploading a JSON file to triggering DAGs in Airflow for risk calculations.
Frontend: Uploading JSON and Selecting the Client#
The user uploads a JSON file containing the portfolio data.
Once uploaded, the client selection dropdown is enabled.
The user selects a client and clicks “Calculate Risks”.
A request is sent to the API with the following structure:
{ "client_id": "AlphaKlima", "portfolio_assets": [ { "id": "1", "asset_id": "1", "name": "HOTEL Alpha-Klima S.A.", "asset_class": "RealEstateAsset", "latitude": 41.394, "longitude": 2.256, ... } ], "portfolio_name": "assets_AlphaKlima.json" }
Backend: Receiving and Processing Data#
The FastAPI backend receives the request at:
@app.post("/api/calculate_portfolio_risks") async def calculate_risks(request: PortfolioRequest): return {"message": f"Calculating risks for {request.client_id}"}
The backend extracts the data and converts it into a DataFrame for processing.
Then, it triggers an Airflow DAG, passing the data inside the conf parameter:
payload = { "dag_run_id": f"manual__{now}", "conf": request.dict() }
The backend makes an HTTP request to Airflow to start the DAG:
requests.post(f"{AIRFLOW_URL}/dags/insert_portfolio_dag/dagRuns", json=payload)
DAG in Airflow: Risk Calculation#
The DAG retrieves the data and converts it into a DataFrame:
import pandas as pd def process_portfolio(data): df = pd.DataFrame(data["portfolio_assets"]) df["location_coordinates"] = df.apply( lambda row: f"POINT({row['longitude']} {row['latitude']})", axis=1 ) return df
It inserts the data into a PostgreSQL database.
The DAG performs financial risk calculations.
Once completed, it sends a response back to the frontend.
Conclusion#
This flow enables users to:
Upload JSON files containing portfolios.
Select a client before performing risk calculations.
Automatically trigger DAGs via API requests.
Retrieve processed risk calculations efficiently.