How to create your own Airbyte connector and pull data from Discord API

Prerequisites

If you have already completed the official tutorial and the PokeAPI tutorial, and are looking to level up your connector development skills, then you are in the right place. 

For this API we will need to create a discord application and a Discord bot, and then get the bot token which we will use in the authentication step. This Bot authorization is a special server-less and callback-less OAuth2 flow that makes it easy for users to add bots to guilds.

You will also need Python 3.9 (or higher) and Docker.

Introduction

In this tutorial, we are going to explain how to create an Airbyte source connector for 3 endpoints of the Discord API using Airbyte’s CDK framework. Among the advantages provided by the cdk system we can mention:

  • Abstract ourselves from the code handling the connection.
  • A vast number of pre-designed connectors.
  • A series of tools to create both a source and a destination with ease.

For more info on Airbyte’s CDK, you can visit official documentation or watch this video

Note: This tutorial covers connectors of type REST http. Remember that you can always consult the Airbyte documentation for any questions.

For this API we need to create a discord application and a Discord bot, and then get the bot token which we use in the authentication step. This Bot authorization is a special server-less and callback-less OAuth2 flow that makes it easy for users to add bots to guilds.

By using this method the API call will require a headers of the following form: {‘Authorization’: ‘Bot XXXXXXXXXXXXXXXX’}

The base url for this API is: https://discord.com/api/.

The endpoints that we are going to connect to are:

  • Get Current User Guilds
  • Get Current User
  • Get Guild

Now let’s move on to the practical part and start with the connector itself

Step1: Create the connector structure and basic configuration

Following the Airbyte tutorial for the Pokeapi you can clone the template for your own connector. We summarize the steps that we use for this connector:

  1. clone git repository
 git clone --depth 1 https://github.com/airbytehq/airbyte/
  1. generate new source
cd airbyte-integrations/connector-templates/generator
./generate.sh

Select the Python HTTP API Source

In this case we named it “discord”.

  1. create a virtual environment to test it locally
cd ../../connectors/source-discord
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Note: you need Python 3.9 or higher, and also Docker.

Source Template

In the folder of the source you’ve just generated, i.e.  “airbyte-integrations/connectors/source-discord” you will find four important files:

source_discord/spec.yaml

This file specifies which parameters must be given to Airbyte in order to make a connection. In this case we just need a token so we replace the field TODO with a token in the required field.

  required:
    - token

And replace the properties with:

token:
      type: string
      description: Token for access.
      airbyte_secret: true

The properties for each parameter are very simple to understand: type specifies the data type of the parameter, description is the description shown at the airbyte UI and airbyte_secret is a boolean that will obfuscate the field if it is true.

Other parameters can be used in the spec.yaml if needed, such as start date and end date (with the format property) or the client id and client secret if the call requires it.

secrets/config.json

In the config.json we include every parameter in the spec.yaml and it’s value:

{
   "token": "Put your token here"
}

configured_catalog.json

We need to create a file named configured_catalog.json which will contain a list of endpoints (Current User, Current User Guilds, Guild). We make this file as we create and check each connector. As we’ll see later each connector has its own class (in PascalCase) and the name of the stream must be the same, but in snake_case:

{
    "streams": [
        {
            "stream": {
                "name": "current_user",
                "json_schema": {},
                "supported_sync_modes": ["full_refresh"]
            },
            "sync_mode": "full_refresh",
            "destination_sync_mode": "overwrite"
        },
        {
            "stream": {
                "name": "current_user_guilds",
                "json_schema": {},
                "supported_sync_modes": ["full_refresh"]
            },
            "sync_mode": "full_refresh",
            "destination_sync_mode": "overwrite"
        },
        {
            "stream": {
                "name": "guilds",
                "json_schema": {},
                "supported_sync_modes": ["full_refresh"]
            },
            "sync_mode": "full_refresh",
            "destination_sync_mode": "overwrite"
        }
    ]
}

As you can see, in this file we define the order of synchronization, the endpoint name, the sync mode and the destination sync mode.

source_discord/source.py

Here we will define the classes and methods that will be the core of our connector. 

A first approach could be, for each endpoint, to build a class that is going to inherit from HttpStreams, thus gaining all the functionalities of the Airbyte CDK. However, since all endpoints have at least some aspects in common, a better approach would be to create a general class DiscordStream that the other classes will inherit from. In other words, we will have a subclass of DiscordStream for each endpoint. This class will contain a default version of the mandatory methods for the connector to work, as well as some variables common to all endpoints.

In order for the subclasses to work, the following methods are necessary:

  • parse_response: Instructs the stream how to parse the API response. This returns an Iterable.
  • next_page_token: If the response came in pages, this method returns the token of the next page and Airbyte keeps making calls as long as next_page_token continues returning non-None results.
  • path: specifies the path of an endpoint.
  • request_params: returns a dictionary with the parameter to the call.

Another important class we need to create is SourceDiscord, which will inherit from AbstractSource, thus handling the connection, authentication and executing the API calls.

This class requires two mandatory methods:

  • check_connection: Validates access and permissioning. The command to execute the Check function is:
python main.py check --config secrets/config.json
  • stream: Reads from each of the AbstractSource’s streams. This function is useful to verify the correct operation of the call. The command to execute the Read function is: 
python main.py read --config secrets/config.json --catalog configured_catalog.json

For more details on the Abstract Source see the following link

Let’s continue to the code which our source.py will have for the Discord source connector

Step 2: Coding source.py

DiscordStream 

DiscordStream is going to define variables and methods for every endpoint. In this case it defines the url_base (finished with a “/”) and a base for the mandatory methods, in particular parse_response that converts the json response, then extracts the data from the data_field in case it exists or just returns the response as is.

class DiscordStream(HttpStream, ABC):
    url_base = "https://discord.com/api/"
 
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        return None
 
    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        return {}
 
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        json_response = response.json()
        self.records = json_response.get(self.data_field, []) if self.data_field is not None else json_response
        yield from self.records

Get Current User Guilds endpoint

This endpoint returns all the guilds the bot is in (remember to include “guilds” in the permissions scope)  

CurrentUserGuilds is the specific class for the Current User Guilds endpoint. 
We start by having a look at the official documentation page of the API: https://discord.com/developers/docs/resources/user#get-current-user-guilds

Current User Guilds documentation

Let’s explore the endpoint using Postman. In this screenshot we can see the response of the call:

Get Current user Guilds endpoint on Postman

Analyzing this response we can start developing the class for this endpoint. 

  • This class defines the variables primary_key (unique identifier) which will be “id” in this case 
  • And variable data_field, which is None because the response comes as a list of dictionaries.
  • It also defines the specific path with the method path that must be in every endpoint (“users/@me/guilds”),  
  • next_page_token method, that takes the response and returns the id of the last record 
  • the specific parameters with the method request_params that contains 
    • “limit” the limit of records to be imported 
    • “after” that updates the returned value in case the next_page_token is distinct from None.

These last two methods overwrite the ones defined in the parent class.

class CurrentUserGuilds(DiscordStream):
    primary_key = "id"
    data_field = None
 
    def request_params(self, stream_state: Mapping[str,Any] = None , stream_slice: Mapping[str,Any] = None, next_page_token: Mapping[str,Any] = None):
        params = {"limit":200}
        if next_page_token: params.update(next_page_token)
        return params
 
    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:
        return "users/@me/guilds"
 
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        last_record_id = {"after":self.records[-1]["id"]} if len(self.records)>0 else None
        return last_record_id

Get CurrentUser endpoint

The next endpoint to develop will be Current User, which returns information about 

https://discord.com/developers/docs/resources/user#get-current-user

Get Current User endpoint postman

In this case, the response is a dictionary (unlike the previous endpoint, where we got a list). Therefore, we need to adapt the parse_response method. 

We’ll add the CurrentUser endpoint’s class. In this case the endpoint requires its own path and a different parsing method. For this reason we overwrite the parse_response method to convert the dictionary response into a list of dictionaries.

class CurrentUser(DiscordStream):
    primary_key = "id"
    data_field = None
 
    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:
        return "users/@me"
 
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        json_response = response.json()
        records = [json_response]
        if not records:
            records = []
        yield from records

Get Guild endpoint

This endpoint will give us specific information about the guild on which we will make the query, therefore we will need the guild id as a query parameter. As we have already seen in the “Get Current User Guilds” endpoint, the guild id is included in the response, which will be used as input for this point.

https://discord.com/developers/docs/resources/guild#get-guild

Get Guild endpoint postman

Then we create the Guilds class that inherits from two classes the general class and the another with the parse response method. In this class the variable guild_id will take the values of the ids from the parent. Later we will have to input the parent class as a parameter for this to work.

class Guilds(DiscordSubStream, DiscordStream):
    primary_key = "id"
    data_field = None
 
    def path(
        self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
    ) -> str:
        guild_id = stream_slice['parent']['id']
        return f"guilds/{guild_id}"
 
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        json_response = response.json()
        records = [json_response]
        if not records:
            records = []
        yield from records

SourceDiscord 

Finally, we will create the SourceDiscord class, which will handle connection, authentication, etc by inheriting these features from AbstractSource. 

Source Class (SourceDiscord)

In this class we are going to define three methods:

  • custom_authentication:

This method is going to take the credentials and return an auth variable. In this case, we simply use a token in the header. In order to send the headers, we define the variables auth_method as ‘Bot’, auth_header as ‘Authorization’ and _token which is taken from the config.

class SourceDiscord(AbstractSource):
    def custom_authentication(self,config):
        auth_method = 'Bot'
        auth_header = 'Authorization'
        _token = config["token"]
       
        return TokenAuthenticator(token=_token, auth_method= auth_method, auth_header= auth_header)

The TokenAuthenticator class takes the auth parameters to make the correct header.

  • check_connection:

It is used to verify that the connection works:

    def check_connection(self, logger, config) -> Tuple[bool, any]:
       
        auth = self.custom_authentication(config)
 
        headers = auth.get_auth_header()
        url = "https://discord.com/api/users/@me"
        try:
            session = requests.get(url, headers=headers)
            session.raise_for_status()
            return True, None
        except requests.exceptions.RequestException as e:
            return False, e

In this case we choose to check the connection with the CurrentUser endpoint 

  • streams:

This method returns a list of streams with the corresponding variables to authenticate, or other functionalities that we may need:

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
       
    auth = self.custom_authentication(config)
 
    return[CurrentUser(authenticator=auth),
           CurrentUserGuilds(authenticator=auth),
           Guilds(CurrentUserGuilds(authenticator=auth)]

In this case we need to pass the auth variable and also the parent variable (CurrentUserGuilds) for the Guilds endpoint.

Step3: Development of basic schemas

source-discord/schemas

As a last step, we must create a schema where we build the structure of the response for each endpoint, specifying the type of data that we expect to receive. Each of these schemas must be a json file located in the schemas folder with the same name used in the configured_catalog.json

current_user_guilds.json

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
        "id": {
            "type": ["null", "string"]
        },
        "name": {
            "type": ["null", "string"]
        },
        "icon": {
            "type": ["null", "string"]
        },
        "owner": {
            "type": ["null", "boolean"]
        },
        "permissions": {
            "type": ["null", "integer"]
        },
        "features": {
            "type": ["null", "array"]
        },
        "permissions_new": {
            "type": ["null", "string"]
        }
    }
}

current_user.json

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
        "id": {
            "type": ["null", "string"]
        },
        "username": {
            "type": ["null", "string"]
        },
        "avatar": {
            "type": ["null", "object"]
        },
        "avatar_decoration": {
            "type": ["null", "object"]
        },
        "discriminator": {
            "type": ["null", "string"]
        },
        "public_flags": {
            "type": ["null", "integer"]
        },
        "flags": {
            "type": ["null", "integer"]
        },
        "bot": {
            "type": ["null", "boolean"]
        },
        "banner": {
            "type": ["null", "object"]
        },
        "banner_color": {
            "type": ["null", "object"]
        },
        "accent_color": {
            "type": ["null", "object"]
        },
        "bio": {
            "type": ["null", "string"]
        },
        "locale": {
            "type": ["null", "string"]
        },
        "mfa_enabled": {
            "type": ["null", "boolean"]
        },
        "email": {
            "type": ["null", "string"]
        },
        "verified": {
            "type": ["null", "boolean"]
        }
    }
}

guilds.json

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
        "id": {
            "type": ["null", "string"]
        },
        "name": {
            "type": ["null", "string"]
        },
        "icon": {
            "type": ["null", "string"]
        },
        "description": {
            "type": ["null", "object"]
        },
        "splash": {
            "type": ["null", "string"]
        },
        "discovery_splash": {
            "type": ["null", "object"]
        },
        "features": {
            "type": ["null", "array"]
        },
        "emojis": {
            "type": ["null", "array"]
        },
        "stickers": {
            "type": ["null", "array"]
        },
        "banner": {
            "type": ["null", "object"]
        },
        "owner_id": {
            "type": ["null", "string"]
        },
        "application_id": {
            "type": ["null", "object"]
        },
        "region": {
            "type": ["null", "string"]
        },
        "afk_channel_id": {
            "type": ["null", "object"]
        },
        "afk_timeout": {
            "type": ["null", "integer"]
        },
        "system_channel_id": {
            "type": ["null", "string"]
        },
        "widget_enabled": {
            "type": ["null", "boolean"]
        },
        "widget_channel_id": {
            "type": ["null", "object"]
        },
        "verification_level": {
            "type": ["null", "integer"]
        },
        "roles": {
            "type": ["null", "array"],
            "items": {
                "type": ["null", "object"],
                "properties": {
                    "id": {
                        "type": ["null", "string"]
                    },
                    "name": {
                        "type": ["null", "string"]
                    },
                    "permissions": {
                        "type": ["null", "integer"]
                    },
                    "position": {
                        "type": ["null", "integer"]
                    },
                    "color": {
                        "type": ["null", "integer"]
                    },
                    "hoist": {
                        "type": ["null", "boolean"]
                    },
                    "managed": {
                        "type": ["null", "boolean"]
                    },
                    "mentionable": {
                        "type": ["null", "boolean"]
                    },
                    "icon": {
                        "type": ["null", "object"]
                    },
                    "unicode_emoji": {
                        "type": ["null", "object"]
                    },
                    "flags": {
                        "type": ["null", "integer"]
                    },
                    "permissions_new": {
                        "type": ["null", "string"]
                    }
                }
            }
        },
        "default_message_notifications": {
            "type": ["null", "integer"]
        },
        "mfa_level": {
            "type": ["null", "integer"]
        },
        "explicit_content_filter": {
            "type": ["null", "integer"]
        },
        "max_presences": {
            "type": ["null", "object"]
        },
        "max_members": {
            "type": ["null", "integer"]
        },
        "max_video_channel_users": {
            "type": ["null", "integer"]
        },
        "vanity_url_code": {
            "type": ["null", "string"]
        },
        "premium_tier": {
            "type": ["null", "integer"]
        },
        "premium_subscription_count": {
            "type": ["null", "integer"]
        },
        "system_channel_flags": {
            "type": ["null", "integer"]
        },
        "preferred_locale": {
            "type": ["null", "string"]
        },
        "rules_channel_id": {
            "type": ["null", "object"]
        },
        "public_updates_channel_id": {
            "type": ["null", "object"]
        }
    }
}

Step 4: Testing our connector

Finally, we need to check that our connector is working as intended. For that, we can do two things: run the Read function as explained in step 2, or pack the newly created connector into a Docker image by running:

docker build . -t airbyte/source-discord-api:dev

and then, at the root of the Airbyte directory, starting the Airbyte console by running: 

docker-compose up

And, finally, configuring the connector from the Airbyte UI. For more details about this, refer to the PokeAPI tutorial or, check this Github Repo with the code. 

Leave a Reply

Your email address will not be published.

Share :

Lastest news

Categories