Introducing Airbyte sources within LangChain

Introducing Airbyte sources within LangChain

4 min read

Editor's Note: This post was written in collaboration with the Airbyte team. They've made it really easy to connect even more data sources to LangChain as document loaders.

It’s now possible to utilize the Airbyte sources for Gong, Hubspot, Salesforce, Shopify, Stripe, Typeform and Zendesk Support directly within your LangChain-based application, implemented as document loaders.

For example, to load the Stripe invoices for a user, you can use the AirbyteStripeLoader. Installing it is super simple, when you have LangChain installed locally you only need to install the source you are interested in, and you are ready to go:

pip install airbyte-source-stripe

After that, simply import the loader and pass in configuration and the stream you want to load:

from langchain.document_loaders.airbyte import AirbyteStripeLoader
config = {
  "client_secret": "<secret key>",
  "account_id": "<account id>",
  "start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>"
}
loader = AirbyteStripeLoader(config=config, stream_name="invoices")
documents = loader.load()
# use documents in vector store or otherwise

Why does this matter?

This is the beginning of making Airbyte’s 300+ sources available as document loaders in LangChain.

Airbyte can move data from just about any source to your warehouse or vector database to power your LLM use case (check out this tutorial for setting up such a data pipeline!). This is normally done by using Airbyte Cloud or a local Airbyte instance, setting up a connection, and running it on a schedule (or via API trigger) to make sure your data stays fresh.

But if you are just getting started and are running everything locally, using a full Airbyte instance (including the UI, scheduling service, scale-out capabilities, etc..) may be overkill.

With this release, it’s easier than ever to run any Python-based source in LangChain directly within your Python runtime - no need to spin up an Airbyte instance or make API calls to Airbyte Cloud.

Moving between hosted and embedded Airbyte

As it’s the same code running under the hood, every Airbyte-built loader is compatible with the respective source in the Airbyte service. This means it’s trivial to lift your embedded loading pipeline into your self-hosted Airbyte installation or your Airbyte Cloud instance. The shape of the configuration object and the records is 100% compatible.

Running syncs on hosted Airbyte means:

  • UI to keep track of running pipelines
  • Alerting on failing syncs
  • Easily running pipelines on a schedule

Running syncs with LangChain loaders means:

  • No overhead for running yet another service
  • Full control over timing and pipeline execution

Mapping Airbyte records to LangChain documents

By default, each record gets mapped to a Document as part of the loader, with all the various fields in the record becoming the metadata of the record. The text portion of the document is left as an empty string. You can pass in a record handler to customize this behavior to build the text part of a record depending on the data:

def handle_record(record, id):
    return Document(page_content=record.data["title"], metadata=record.data)
loader = AirbyteGongLoader(config=config, record_handler=handle_record, stream_name="calls")

Incremental loads

Since your python application is basically acting as the Airbyte platform, you have full control over how the “sync” is executed. For example you can still benefit from incremental syncs if your stream supports it by accessing the “last_state” property of the loader. This allows you to load only documents that changed since the last time you loaded, allowing you to update an existing vector database effectively:

import airbyte_cdk.models.airbyte_protocol import AirbyteMessage
with open('stripe_sync_checkpoint.json', 'wb') as file:
    file.write(loader.last_state.json())

// ... later
with open('stripe_sync_checkpoint.json', 'r') as file:
    current_state = AirbyteStateMessage.parse_raw(file.read())
incremental_loader = AirbyteStripeLoader(config=config, stream_name="invoices", state=current_state)
new_docs = incremental_loader.load()

Custom sources

For now, the following Airbyte sources are available as pip packages (with more to come):

  • Gong pip install airbyte-source-gong
  • Hubspot pip install airbyte-source-hubspot
  • Salesforce pip install airbyte-source-salesforce
  • Shopify pip install airbyte-source-shopify
  • Stripe pip install airbyte-source-stripe
  • Typeform pip install airbyte-source-typeform
  • Zendesk Support pip install airbyte-source-zendesk-support

However, if you have implemented your own custom Airbyte sources, it’s also possible to integrate them by using the AirbyteCDKLoader base class that works with the Source interface of the Airbyte CDK:

from langchain.document_loaders.airbyte import AirbyteCDKLoader
from my_source.source import MyCustomSource # plug in your own source here
config = {
   # your custom configuration
}
loader = AirbyteCDKLoader(source_class=MyCustomSource, config=config, stream_name="my-stream")

You can also install sources from the main Airbyte repository by installing directly via git - for example, to fetch the Github source, simply run

pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

After that, the source is available to be plucked into the AirbyteCDKLoader:

from source_github.source import SourceGithub
issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name="issues")

Check out the connector development documentation for how to get started writing your own sources - it’s easy to get started with them and will allow you to move from local embedded loaders to using a hosted Airbyte instance seamlessly depending on your needs.

Any questions? We would love to hear from you

If you are interested in leveraging Airbyte to ship data to your LLM-based applications, please take a moment to fill out our survey so we can make sure to prioritize the most important features.

If you have questions or are interested in other existing sources being exposed as loaders this way, do not hesitate to reach out on our community slack channel or in the Airbyte channel on the LangChain discord server.