'How to pull data from a paginated JSON API using kedro (APIDataSet)?

The problem: I would like to retrieve data from a paginated API that sends JSON responses. Using kedro.extras.datasets.api.APIDataSet I can query the API and retrieve the initial response. However if there are more results than the size limit per API request, I need to traverse the pagination links in the JSON responses. Has anybody successfully done this already?

Should I subclass APIDataSet for this and put the link traversal logic in the _execute_request() method? The provided APIDataSet returns requests.Response objects. Should a subclassed APIDataSet return (or yield) the results directly?

I tried this approach and it works to retrieve the data. But I am unsure if this is the "kedro way" to do it. Should the traversal logic be done in a node instead?

import copy
from typing import Any, Dict, Iterable, List, Union

import dpath.util
import requests
from kedro.extras.datasets.api import APIDataSet
from kedro.io.core import DataSetError
from requests.auth import AuthBase

class PaginatedJSONAPIDataSet(APIDataSet):
    def __init__(
        self,
        url: str,
        method: str = "GET",
        data: Any = None,
        params: Dict[str, Any] = None,
        headers: Dict[str, Any] = None,
        auth: Union[Iterable[str], AuthBase] = None,
        json: Union[List, Dict[str, Any]] = None,
        timeout: int = 60,
        credentials: Union[Iterable[str], AuthBase] = None,
        items_path: str = None,
        next_link_path: str = None,  # multiple keys possible to access next link in nested json, separate with "/", like "key1/key2"
    ):
        super().__init__(
            url, method, data, params, headers, auth, json, timeout, credentials
        )
        self.items_path = items_path
        self.next_link_path = next_link_path

    def _execute_request(self) -> List[Dict[str, Any]]:
        # initial request
        try:
            response = requests.request(**self._request_args)
            response.raise_for_status()
        except requests.exceptions.HTTPError as exc:
            raise DataSetError("Failed to fetch data", exc) from exc
        except OSError as exc:
            raise DataSetError("Failed to connect to the remote server") from exc

        request_args = copy.deepcopy(self._request_args)
        request_args.pop("params")
        hits = []
        # pagination traversal
        while True:
            hits.extend(dpath.util.get(response.json(), self.items_path))
            try:
                next_link = dpath.util.get(response.json(), self.next_link_path)
            # next link key is not present in json response
            except KeyError:
                break
            # next link key is present, but value is null / None
            if next_link is None:
                break
            request_args["url"] = next_link
            response = requests.request(**request_args)
        return hits
# toy example with a paginated API, to demonstrate pagination traversal
data_set = PaginatedJSONAPIDataSet(
    url="https://pokeapi.co/api/v2/pokemon",
    items_path="results",
    next_link_path="next",
    params={
        "limit": 500
    }
)
data = data_set.load()
print(type(data)) # <class 'list'>
print(len(data)) # 1126
print(data[0]) # {'name': 'bulbasaur', 'url': 'https://pokeapi.co/api/v2/pokemon/1/'}

Can someone give me a tip if they have done something similar or refer me to a best practice example (I could not find one)?



Solution 1:[1]

You would have to define a custom dataset, it should be easy to take the existing implementant and extend / override to handle the pagination part.

We'd love a PR back into the main project as I think this would be useful for other users, amazingly (to my knowledge) it's not come up before.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 datajoely