Skip to main content
A lightweight Python wrapper for the OutX API. Copy the class below into your project — no package installation needed beyond requests.

Quick Start

import os
from outx import OutXClient

client = OutXClient(api_key=os.environ["OUTX_API_KEY"])

# Fetch a LinkedIn profile
profile = client.fetch_profile("williamhgates")
print(profile["profile"]["full_name"])  # Bill Gates

# Get posts from a watchlist
posts = client.get_posts(watchlist_id="your-watchlist-id", sort_by="recent_first")
for post in posts["data"]:
    print(f"{post['author_name']}: {post['content'][:100]}")

# Like a post
client.like_post(post_id=posts["data"][0]["id"], user_email="you@company.com")

The Client Class

Save this as outx.py in your project:
"""OutX API client — drop-in wrapper for the OutX REST API."""

import time
import requests


class OutXClient:
    """Lightweight client for the OutX API.

    Args:
        api_key: Your OutX API key.
        base_url: API base URL (default: https://api.outx.ai).
        poll_interval: Seconds between task status polls (default: 5).
        poll_timeout: Max seconds to wait for async tasks (default: 150).
    """

    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.outx.ai",
        poll_interval: int = 5,
        poll_timeout: int = 150,
    ):
        self.base_url = base_url
        self.poll_interval = poll_interval
        self.poll_timeout = poll_timeout
        self.session = requests.Session()
        self.session.headers.update({
            "x-api-key": api_key,
            "Content-Type": "application/json",
        })

    # ── LinkedIn Data API (async) ────────────────────────────

    def fetch_profile(self, profile_slug: str) -> dict:
        """Fetch a LinkedIn profile by slug. Blocks until complete."""
        resp = self.session.post(
            f"{self.base_url}/linkedin-agent/fetch-profile",
            json={"profile_slug": profile_slug},
        )
        resp.raise_for_status()
        return self._wait_for_task(resp.json()["api_agent_task_id"])

    def fetch_posts_by_urn(self, profile_urns: list[str]) -> dict:
        """Fetch posts from LinkedIn profiles by URN. Blocks until complete."""
        resp = self.session.post(
            f"{self.base_url}/linkedin-agent/fetch-profiles-posts",
            json={"profile_urns": profile_urns},
        )
        resp.raise_for_status()
        return self._wait_for_task(resp.json()["api_agent_task_id"])

    def like_linkedin_post(self, social_urn: str) -> dict:
        """Like a LinkedIn post by activity URN. Blocks until complete."""
        resp = self.session.post(
            f"{self.base_url}/linkedin-agent/like-post",
            json={"social_urn": social_urn},
        )
        resp.raise_for_status()
        return self._wait_for_task(resp.json()["api_agent_task_id"])

    def comment_linkedin_post(self, social_urn: str, comment_text: str) -> dict:
        """Comment on a LinkedIn post by activity URN. Blocks until complete."""
        resp = self.session.post(
            f"{self.base_url}/linkedin-agent/comment-post",
            json={"social_urn": social_urn, "comment_text": comment_text},
        )
        resp.raise_for_status()
        return self._wait_for_task(resp.json()["api_agent_task_id"])

    def get_task_status(self, task_id: str) -> dict:
        """Check the status of an async task."""
        resp = self.session.get(
            f"{self.base_url}/linkedin-agent/get-task-status",
            params={"api_agent_task_id": task_id},
        )
        resp.raise_for_status()
        return resp.json()

    # ── Watchlists & Engagement API (sync) ───────────────────

    def create_keyword_watchlist(self, keywords: list, **kwargs) -> dict:
        """Create a keyword watchlist.

        Args:
            keywords: List of keyword strings or objects with
                      keyword/required_keywords/exclude_keywords.
            **kwargs: name, description, labels, fetchFreqInHours (1,3,6,12,24,48,72).
        """
        resp = self.session.post(
            f"{self.base_url}/api-keyword-watchlist",
            json={"keywords": keywords, **kwargs},
        )
        resp.raise_for_status()
        return resp.json()

    def create_people_watchlist(self, profiles: list, **kwargs) -> dict:
        """Create a people watchlist.

        Args:
            profiles: List of LinkedIn profile URLs or slugs.
            **kwargs: name, description, labels, fetchFreqInHours.
        """
        resp = self.session.post(
            f"{self.base_url}/api-people-watchlist",
            json={"profiles": profiles, **kwargs},
        )
        resp.raise_for_status()
        return resp.json()

    def create_company_watchlist(self, companies: list, **kwargs) -> dict:
        """Create a company watchlist.

        Args:
            companies: List of LinkedIn company URLs or slugs.
            **kwargs: name, description, labels, fetchFreqInHours.
        """
        resp = self.session.post(
            f"{self.base_url}/api-company-watchlist",
            json={"companies": companies, **kwargs},
        )
        resp.raise_for_status()
        return resp.json()

    def get_watchlists(self, watchlist_type: str = "keyword") -> dict:
        """List watchlists. Type: 'keyword', 'people', or 'company'."""
        endpoint_map = {
            "keyword": "api-keyword-watchlist",
            "people": "api-people-watchlist",
            "company": "api-company-watchlist",
        }
        resp = self.session.get(
            f"{self.base_url}/{endpoint_map[watchlist_type]}"
        )
        resp.raise_for_status()
        return resp.json()

    def get_posts(self, watchlist_id: str = None, **kwargs) -> dict:
        """Get posts from watchlists.

        Args:
            watchlist_id: Filter by watchlist (omit for all).
            **kwargs: page, sort_by, start_date, end_date, seniority_level,
                      trending, lang, post_type, search_term.
        """
        params = {k: v for k, v in {**kwargs, "watchlist_id": watchlist_id}.items() if v is not None}
        resp = self.session.get(
            f"{self.base_url}/api-posts",
            params=params,
        )
        resp.raise_for_status()
        return resp.json()

    def like_post(self, post_id: str, user_email: str, actor_type: str = "user") -> dict:
        """Like a watchlist post."""
        resp = self.session.post(
            f"{self.base_url}/api-like",
            json={"post_id": post_id, "user_email": user_email, "actor_type": actor_type},
        )
        resp.raise_for_status()
        return resp.json()

    def comment_post(self, post_id: str, user_email: str, comment_text: str, actor_type: str = "user") -> dict:
        """Comment on a watchlist post."""
        resp = self.session.post(
            f"{self.base_url}/api-comment",
            json={
                "post_id": post_id,
                "user_email": user_email,
                "comment_text": comment_text,
                "actor_type": actor_type,
            },
        )
        resp.raise_for_status()
        return resp.json()

    # ── Internal ─────────────────────────────────────────────

    def _wait_for_task(self, task_id: str) -> dict:
        """Poll until an async task completes or times out."""
        elapsed = 0
        while elapsed < self.poll_timeout:
            result = self.get_task_status(task_id)
            status = result["data"]["status"]

            if status == "completed":
                return result["data"]["task_output"]
            if status == "failed":
                raise RuntimeError(f"Task {task_id} failed: {result['data'].get('task_output')}")

            time.sleep(self.poll_interval)
            elapsed += self.poll_interval

        raise TimeoutError(f"Task {task_id} did not complete within {self.poll_timeout}s")

Usage Examples

Monitor Watchlist for Buying Intent

import os
from outx import OutXClient

client = OutXClient(api_key=os.environ["OUTX_API_KEY"])

# Get recent posts sorted by relevance
posts = client.get_posts(
    watchlist_id="your-watchlist-id",
    sort_by="recent_first",
    page=1,
)

# Filter for high-engagement posts
for post in posts["data"]:
    if post.get("likes_count", 0) > 10:
        print(f"Hot post by {post['author_name']}: {post['content'][:150]}")

Fetch Profile + Posts Pipeline

import time

# Fetch profile
profile = client.fetch_profile("target-prospect")
print(f"Name: {profile['profile']['full_name']}")
print(f"Headline: {profile['profile']['headline']}")

# Use the profile URN to fetch their posts
time.sleep(30)  # Space out LinkedIn Data API calls
urn = profile["profile"]["profile_urn"]
posts = client.fetch_posts_by_urn([urn])
print(f"Found {len(posts.get('posts', []))} posts")
Space out LinkedIn Data API calls. The fetch_profile, fetch_posts_by_urn, like_linkedin_post, and comment_linkedin_post methods all execute through real LinkedIn sessions. Add time.sleep(30) between consecutive calls. See Rate Limits.

Create Watchlist + Poll for New Posts

# Create a keyword watchlist
watchlist = client.create_keyword_watchlist(
    keywords=[
        "looking for a CRM",
        {"keyword": "CRM recommendation", "exclude_keywords": ["hiring"]},
    ],
    name="CRM Buying Intent",
    fetchFreqInHours=6,
)

watchlist_id = watchlist["data"]["id"]
print(f"Watchlist created: {watchlist_id}")

# Wait for first fetch cycle, then check for posts
# (posts appear after the first scan, based on fetchFreqInHours)

Error Handling

The client raises standard exceptions:
from requests.exceptions import HTTPError

try:
    profile = client.fetch_profile("nonexistent-slug")
except HTTPError as e:
    if e.response.status_code == 403:
        print("Chrome extension not active — check your browser")
    elif e.response.status_code == 429:
        print("Rate limited — slow down")
    else:
        print(f"API error: {e.response.status_code}{e.response.text}")
except TimeoutError:
    print("Task timed out — Chrome extension may be offline")

Partnerships

Interested in a collaboration or affiliate partnership? Reach out at support@outx.ai.

Learn More