Skip to content

Authentication

Authentication utilities for Prusa Connect.

This module mimics the pattern found in google-auth. It provides a Credentials object that can automatically refresh tokens and attach headers to requests.

How to use the most important parts: - PrusaConnectCredentials: The main credentials object for interacting with the SDK. Pass this to PrusaConnectClient(credentials=...). - interactive_login: A helper function to kick off an interactive OAuth2 flow, returning a JSON serializable dict with access and refresh tokens. - get_default_token_path: Helper to determine the standard configuration path to persist tokens locally.

FUNCTION DESCRIPTION
get_default_token_path

Returns the platform-specific path for the token file using platformdirs.

interactive_login

Performs the full PKCE login flow, including screen scraping and TOTP.

PrusaAccessToken

Bases: PrusaJwtModel

Structure of the Access Token.

Source code in src/prusa/connect/client/auth.py
102
103
104
105
106
107
108
109
110
111
class PrusaAccessToken(PrusaJwtModel):
    """Structure of the Access Token."""

    token_id: str = pydantic.Field(alias="jti")
    user_id: int = pydantic.Field(alias="sub")
    expires_at: datetime.datetime = pydantic.Field(alias="exp")
    session_id: str = pydantic.Field(alias="sid")
    app_slug: str = pydantic.Field(alias="app")
    token_type: str = pydantic.Field(alias="type")
    connect_id: str

PrusaConnectCredentials

Authentication credentials that allow making authorized API calls.

This class manages the lifecycle of the access token, including automatic refreshing when expired.

METHOD DESCRIPTION
__init__

Initialize credentials.

before_request

Injects the Authorization header into the request headers.

from_env

Factory: Load credentials from environment variables.

from_file

Factory: Load credentials from a JSON file.

load_default

Factory: Attempt to load credentials from default locations.

refresh

Forces a token refresh using the refresh token.

Source code in src/prusa/connect/client/auth.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
class PrusaConnectCredentials:
    """Authentication credentials that allow making authorized API calls.

    This class manages the lifecycle of the access token, including
    automatic refreshing when expired.
    """

    def __init__(
        self,
        token_info: dict[str, typing.Any] | PrusaJWTTokenSet,
        token_saver: collections.abc.Callable[[dict], None] | None = None,
    ):
        """Initialize credentials.

        Args:
            token_info: Dictionary or PrusaJWTTokenSet containing access_token, etc.
            token_saver: Optional callback executed when tokens are refreshed (to save to disk).
        """
        self._load_tokens(token_info)
        self.token_saver = token_saver
        self._session = requests.Session()  # Session for refresh calls

    def _load_tokens(self, data: dict[str, typing.Any] | PrusaJWTTokenSet) -> None:
        """Parses data into internal state."""
        if isinstance(data, dict):
            # Parse dict into PrusaJWTTokenSet
            self.tokens = PrusaJWTTokenSet(**data)
        else:
            self.tokens = data

    @property
    def valid(self) -> bool:
        """Checks if the current access token is valid (not expired)."""
        if not self.tokens.access_token:
            return False

        return _is_token_valid(self.tokens.access_token)

    def refresh(self) -> None:
        """Forces a token refresh using the refresh token."""
        if not self.tokens.refresh_token or not self.tokens.refresh_token.raw_token:
            raise exceptions.PrusaAuthError("Cannot refresh token: No refresh token present.")

        if not _is_token_valid(self.tokens.refresh_token):
            logger.error("Refresh token expired", expires_at=self.tokens.refresh_token.expires_at)
            raise exceptions.PrusaAuthError("Cannot refresh token: Refresh token is expired.")

        logger.debug("Refreshing access token...", refresh_token_id=self.tokens.refresh_token.token_id)
        payload = {
            "grant_type": "refresh_token",
            "client_id": consts.CLIENT_ID,
            "refresh_token": self.tokens.refresh_token.raw_token,
        }

        resp = self._session.post(consts.TOKEN_URL, data=payload)

        if resp.status_code == 200:
            new_data = resp.json()
            # We need to update our PrusaJWTTokenSet.
            # The refresh response typically contains a new access_token and optionally a new refresh_token.

            # Use dump_tokens() to get current state as dict of raw strings
            current_raw = self.tokens.dump_tokens()
            # Update with new raw strings from response
            current_raw.update(new_data)

            # Reload from the updated raw dict
            self._load_tokens(current_raw)
            logger.info("Token refreshed successfully.")

            if self.token_saver:
                # pass raw dict back to saver
                self.token_saver(self.tokens.dump_tokens())
        else:
            logger.error("Token refresh failed", status=resp.status_code, body=resp.text)
            raise exceptions.PrusaAuthError("Failed to refresh token. Re-authentication required.")

    def before_request(self, headers: collections.abc.MutableMapping[str, str | bytes]) -> None:
        """Injects the Authorization header into the request headers.

        Refreshes the token automatically if needed.
        """
        if not self.valid:
            self.refresh()

        headers["Authorization"] = f"Bearer {self.tokens.access_token.raw_token}"

    @classmethod
    def from_file(cls, path: Path | str) -> PrusaConnectCredentials | None:
        """Factory: Load credentials from a JSON file."""
        if isinstance(path, str):
            path = Path(path)

        try:
            logger.debug(f"Loading credentials from file: {path}")
            with path.open() as f:
                data = json.load(f)

            # Define a saver that updates this specific file
            def save_to_disk(new_data):
                logger.debug(f"Saving credentials to file: {path}")
                # Ensure directory exists (in case it was deleted)
                path.parent.mkdir(parents=True, exist_ok=True)
                with path.open("w") as f:
                    json.dump(new_data, f, indent=2)

                # Best-effort secure permissions
                with contextlib.suppress(OSError):
                    os.chmod(path, 0o600)

            return cls(data, token_saver=save_to_disk)
        except (FileNotFoundError, json.JSONDecodeError):
            logger.debug(f"No credentials found at file: {path.absolute()}")
            return None

    @classmethod
    def from_env(cls) -> PrusaConnectCredentials | None:
        """Factory: Load credentials from environment variables.

        Checks:
        1. PRUSA_TOKENS_JSON: A JSON string containing the full token set.
        2. PRUSA_TOKEN: A raw Access Token (JWT).
        """
        if json_str := os.environ.get("PRUSA_TOKENS_JSON"):
            logger.debug("Loading credentials from env: PRUSA_TOKENS_JSON")
            try:
                return cls(json.loads(json_str))
            except json.JSONDecodeError:
                logger.warning("Invalid JSON in PRUSA_TOKENS_JSON")

        if token := os.environ.get("PRUSA_TOKEN"):
            logger.debug("Loading credentials from env: PRUSA_TOKEN")
            try:
                # Pydantic will attempt to parse the string into the AccessToken model
                return cls({"access_token": token})
            except Exception as e:
                logger.debug(f"Could not create credentials from PRUSA_TOKEN: {e}")

        return None

    @classmethod
    def load_default(cls) -> PrusaConnectCredentials | None:
        """Factory: Attempt to load credentials from default locations.

        Priority:
        1. Environment Variables (PRUSA_TOKENS_JSON, PRUSA_TOKEN)
        2. Platform default config file
        """
        # 1. Environment
        if creds := cls.from_env():
            return creds

        # 2. Platform default file
        default_path = get_default_token_path()
        logger.debug("Checking for credentials at default path", path=default_path)
        if default_path.exists():
            return cls.from_file(default_path)

        return None

valid property

Checks if the current access token is valid (not expired).

__init__(token_info, token_saver=None)

Initialize credentials.

PARAMETER DESCRIPTION
token_info

Dictionary or PrusaJWTTokenSet containing access_token, etc.

TYPE: dict[str, Any] | PrusaJWTTokenSet

token_saver

Optional callback executed when tokens are refreshed (to save to disk).

TYPE: Callable[[dict], None] | None DEFAULT: None

Source code in src/prusa/connect/client/auth.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def __init__(
    self,
    token_info: dict[str, typing.Any] | PrusaJWTTokenSet,
    token_saver: collections.abc.Callable[[dict], None] | None = None,
):
    """Initialize credentials.

    Args:
        token_info: Dictionary or PrusaJWTTokenSet containing access_token, etc.
        token_saver: Optional callback executed when tokens are refreshed (to save to disk).
    """
    self._load_tokens(token_info)
    self.token_saver = token_saver
    self._session = requests.Session()  # Session for refresh calls

before_request(headers)

Injects the Authorization header into the request headers.

Refreshes the token automatically if needed.

Source code in src/prusa/connect/client/auth.py
269
270
271
272
273
274
275
276
277
def before_request(self, headers: collections.abc.MutableMapping[str, str | bytes]) -> None:
    """Injects the Authorization header into the request headers.

    Refreshes the token automatically if needed.
    """
    if not self.valid:
        self.refresh()

    headers["Authorization"] = f"Bearer {self.tokens.access_token.raw_token}"

from_env() classmethod

Factory: Load credentials from environment variables.

Checks: 1. PRUSA_TOKENS_JSON: A JSON string containing the full token set. 2. PRUSA_TOKEN: A raw Access Token (JWT).

Source code in src/prusa/connect/client/auth.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
@classmethod
def from_env(cls) -> PrusaConnectCredentials | None:
    """Factory: Load credentials from environment variables.

    Checks:
    1. PRUSA_TOKENS_JSON: A JSON string containing the full token set.
    2. PRUSA_TOKEN: A raw Access Token (JWT).
    """
    if json_str := os.environ.get("PRUSA_TOKENS_JSON"):
        logger.debug("Loading credentials from env: PRUSA_TOKENS_JSON")
        try:
            return cls(json.loads(json_str))
        except json.JSONDecodeError:
            logger.warning("Invalid JSON in PRUSA_TOKENS_JSON")

    if token := os.environ.get("PRUSA_TOKEN"):
        logger.debug("Loading credentials from env: PRUSA_TOKEN")
        try:
            # Pydantic will attempt to parse the string into the AccessToken model
            return cls({"access_token": token})
        except Exception as e:
            logger.debug(f"Could not create credentials from PRUSA_TOKEN: {e}")

    return None

from_file(path) classmethod

Factory: Load credentials from a JSON file.

Source code in src/prusa/connect/client/auth.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@classmethod
def from_file(cls, path: Path | str) -> PrusaConnectCredentials | None:
    """Factory: Load credentials from a JSON file."""
    if isinstance(path, str):
        path = Path(path)

    try:
        logger.debug(f"Loading credentials from file: {path}")
        with path.open() as f:
            data = json.load(f)

        # Define a saver that updates this specific file
        def save_to_disk(new_data):
            logger.debug(f"Saving credentials to file: {path}")
            # Ensure directory exists (in case it was deleted)
            path.parent.mkdir(parents=True, exist_ok=True)
            with path.open("w") as f:
                json.dump(new_data, f, indent=2)

            # Best-effort secure permissions
            with contextlib.suppress(OSError):
                os.chmod(path, 0o600)

        return cls(data, token_saver=save_to_disk)
    except (FileNotFoundError, json.JSONDecodeError):
        logger.debug(f"No credentials found at file: {path.absolute()}")
        return None

load_default() classmethod

Factory: Attempt to load credentials from default locations.

Priority: 1. Environment Variables (PRUSA_TOKENS_JSON, PRUSA_TOKEN) 2. Platform default config file

Source code in src/prusa/connect/client/auth.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
@classmethod
def load_default(cls) -> PrusaConnectCredentials | None:
    """Factory: Attempt to load credentials from default locations.

    Priority:
    1. Environment Variables (PRUSA_TOKENS_JSON, PRUSA_TOKEN)
    2. Platform default config file
    """
    # 1. Environment
    if creds := cls.from_env():
        return creds

    # 2. Platform default file
    default_path = get_default_token_path()
    logger.debug("Checking for credentials at default path", path=default_path)
    if default_path.exists():
        return cls.from_file(default_path)

    return None

refresh()

Forces a token refresh using the refresh token.

Source code in src/prusa/connect/client/auth.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def refresh(self) -> None:
    """Forces a token refresh using the refresh token."""
    if not self.tokens.refresh_token or not self.tokens.refresh_token.raw_token:
        raise exceptions.PrusaAuthError("Cannot refresh token: No refresh token present.")

    if not _is_token_valid(self.tokens.refresh_token):
        logger.error("Refresh token expired", expires_at=self.tokens.refresh_token.expires_at)
        raise exceptions.PrusaAuthError("Cannot refresh token: Refresh token is expired.")

    logger.debug("Refreshing access token...", refresh_token_id=self.tokens.refresh_token.token_id)
    payload = {
        "grant_type": "refresh_token",
        "client_id": consts.CLIENT_ID,
        "refresh_token": self.tokens.refresh_token.raw_token,
    }

    resp = self._session.post(consts.TOKEN_URL, data=payload)

    if resp.status_code == 200:
        new_data = resp.json()
        # We need to update our PrusaJWTTokenSet.
        # The refresh response typically contains a new access_token and optionally a new refresh_token.

        # Use dump_tokens() to get current state as dict of raw strings
        current_raw = self.tokens.dump_tokens()
        # Update with new raw strings from response
        current_raw.update(new_data)

        # Reload from the updated raw dict
        self._load_tokens(current_raw)
        logger.info("Token refreshed successfully.")

        if self.token_saver:
            # pass raw dict back to saver
            self.token_saver(self.tokens.dump_tokens())
    else:
        logger.error("Token refresh failed", status=resp.status_code, body=resp.text)
        raise exceptions.PrusaAuthError("Failed to refresh token. Re-authentication required.")

PrusaIdentityToken

Bases: PrusaJwtModel

Structure of the Identity Token.

Source code in src/prusa/connect/client/auth.py
125
126
127
128
129
130
131
132
133
class PrusaIdentityToken(PrusaJwtModel):
    """Structure of the Identity Token."""

    token_id: str = pydantic.Field(alias="jti")
    user_id: int = pydantic.Field(alias="sub")
    expires_at: datetime.datetime = pydantic.Field(alias="exp")
    audience: str = pydantic.Field(alias="aud")
    user_info: dict[str, typing.Any] = pydantic.Field(alias="user")
    issuer: str = pydantic.Field(alias="iss")

PrusaJWTTokenSet

Bases: BaseModel

JWT token data structure.

METHOD DESCRIPTION
dump_tokens

Returns the raw tokens as a dictionary, suitable for saving to disk.

Source code in src/prusa/connect/client/auth.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class PrusaJWTTokenSet(pydantic.BaseModel):
    """JWT token data structure."""

    access_token: PrusaAccessToken
    refresh_token: PrusaRefreshToken | None = None
    identity_token: typing.Annotated[PrusaIdentityToken | None, pydantic.Field(alias="id_token")] = None
    expires_in: int | None = None
    token_type: str | None = None
    scope: typing.Annotated[
        list[str],
        pydantic.Field(default_factory=list),
        pydantic.BeforeValidator(lambda v: v.split() if isinstance(v, str) else v),
    ]
    shared_session_key: str | None = None

    def dump_tokens(self) -> dict[str, typing.Any]:
        """Returns the raw tokens as a dictionary, suitable for saving to disk."""
        data: dict[str, typing.Any] = {}
        if self.access_token and self.access_token.raw_token:
            data["access_token"] = self.access_token.raw_token
        if self.refresh_token and self.refresh_token.raw_token:
            data["refresh_token"] = self.refresh_token.raw_token
        if self.identity_token and self.identity_token.raw_token:
            data["id_token"] = self.identity_token.raw_token
        if self.expires_in is not None:
            data["expires_in"] = self.expires_in
        if self.token_type is not None:
            data["token_type"] = self.token_type
        if self.scope:
            data["scope"] = " ".join(self.scope)
        if self.shared_session_key is not None:
            data["shared_session_key"] = self.shared_session_key
        return data

dump_tokens()

Returns the raw tokens as a dictionary, suitable for saving to disk.

Source code in src/prusa/connect/client/auth.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def dump_tokens(self) -> dict[str, typing.Any]:
    """Returns the raw tokens as a dictionary, suitable for saving to disk."""
    data: dict[str, typing.Any] = {}
    if self.access_token and self.access_token.raw_token:
        data["access_token"] = self.access_token.raw_token
    if self.refresh_token and self.refresh_token.raw_token:
        data["refresh_token"] = self.refresh_token.raw_token
    if self.identity_token and self.identity_token.raw_token:
        data["id_token"] = self.identity_token.raw_token
    if self.expires_in is not None:
        data["expires_in"] = self.expires_in
    if self.token_type is not None:
        data["token_type"] = self.token_type
    if self.scope:
        data["scope"] = " ".join(self.scope)
    if self.shared_session_key is not None:
        data["shared_session_key"] = self.shared_session_key
    return data

PrusaJwtModel

Bases: BaseModel

Base model for JWT-based tokens that can parse raw strings.

METHOD DESCRIPTION
__init__

Allows initializing with a raw JWT string.

parse_jwt_string

Parses a raw JWT string into a dictionary of claims.

Source code in src/prusa/connect/client/auth.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class PrusaJwtModel(pydantic.BaseModel):
    """Base model for JWT-based tokens that can parse raw strings."""

    raw_token: str | None = pydantic.Field(default=None, exclude=True)

    @pydantic.model_validator(mode="before")
    @classmethod
    def parse_jwt_string(cls, data: typing.Any) -> typing.Any:
        """Parses a raw JWT string into a dictionary of claims."""
        if isinstance(data, str):
            try:
                claims = _decode_jwt(data)
                # Ensure we don't overwrite existing raw_token if somehow present
                if isinstance(claims, dict):
                    claims["raw_token"] = data
                return claims
            except Exception as e:
                raise ValueError(f"Invalid JWT format: {e}") from e
        return data

    def __init__(self, token: str | None = None, /, **data: typing.Any):
        """Allows initializing with a raw JWT string."""
        if token is not None:
            # Re-use the decoding logic or similar.
            # Since validation normally handles decoding for model_validate(str),
            # for __init__ we need to manually prep the dict if we want to support positional args.
            try:
                claims = _decode_jwt(token)
                data.update(claims)
                data["raw_token"] = token
            except Exception:
                # If it fails, we assume it might be intentional or let validation fail later.
                # But typically we want to support the raw string flow.
                pass
        super().__init__(**data)

__init__(token=None, /, **data)

Allows initializing with a raw JWT string.

Source code in src/prusa/connect/client/auth.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(self, token: str | None = None, /, **data: typing.Any):
    """Allows initializing with a raw JWT string."""
    if token is not None:
        # Re-use the decoding logic or similar.
        # Since validation normally handles decoding for model_validate(str),
        # for __init__ we need to manually prep the dict if we want to support positional args.
        try:
            claims = _decode_jwt(token)
            data.update(claims)
            data["raw_token"] = token
        except Exception:
            # If it fails, we assume it might be intentional or let validation fail later.
            # But typically we want to support the raw string flow.
            pass
    super().__init__(**data)

parse_jwt_string(data) classmethod

Parses a raw JWT string into a dictionary of claims.

Source code in src/prusa/connect/client/auth.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@pydantic.model_validator(mode="before")
@classmethod
def parse_jwt_string(cls, data: typing.Any) -> typing.Any:
    """Parses a raw JWT string into a dictionary of claims."""
    if isinstance(data, str):
        try:
            claims = _decode_jwt(data)
            # Ensure we don't overwrite existing raw_token if somehow present
            if isinstance(claims, dict):
                claims["raw_token"] = data
            return claims
        except Exception as e:
            raise ValueError(f"Invalid JWT format: {e}") from e
    return data

PrusaRefreshToken

Bases: PrusaJwtModel

Structure of the Refresh Token.

Source code in src/prusa/connect/client/auth.py
114
115
116
117
118
119
120
121
122
class PrusaRefreshToken(PrusaJwtModel):
    """Structure of the Refresh Token."""

    token_id: str = pydantic.Field(alias="jti")
    user_id: int = pydantic.Field(alias="sub")
    expires_at: datetime.datetime = pydantic.Field(alias="exp")
    session_id: str = pydantic.Field(alias="sid")
    app_slug: str = pydantic.Field(alias="app")
    token_type: str = pydantic.Field(alias="type")

get_default_token_path()

Returns the platform-specific path for the token file using platformdirs.

Source code in src/prusa/connect/client/auth.py
185
186
187
188
189
def get_default_token_path() -> Path:
    """Returns the platform-specific path for the token file using platformdirs."""
    config_dir = Path(platformdirs.user_config_dir(consts.APP_NAME, consts.APP_AUTHOR))
    config_dir.mkdir(parents=True, exist_ok=True)
    return config_dir / "prusa_tokens.json"

interactive_login(email, password, otp_callback)

Performs the full PKCE login flow, including screen scraping and TOTP.

PARAMETER DESCRIPTION
email

Prusa account email.

TYPE: str

password

Prusa account password.

TYPE: str

otp_callback

A function that returns the 6-digit 2FA code if requested.

TYPE: Callable[[], str]

RETURNS DESCRIPTION
PrusaJWTTokenSet

A dict containing the token response (access_token, refresh_token, etc).

RAISES DESCRIPTION
PrusaAuthError

If login fails.

Source code in src/prusa/connect/client/auth.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def interactive_login(email: str, password: str, otp_callback: collections.abc.Callable[[], str]) -> PrusaJWTTokenSet:
    """Performs the full PKCE login flow, including screen scraping and TOTP.

    Args:
        email: Prusa account email.
        password: Prusa account password.
        otp_callback: A function that returns the 6-digit 2FA code if requested.

    Returns:
        A dict containing the token response (access_token, refresh_token, etc).

    Raises:
        PrusaAuthError: If login fails.
    """
    session = requests.Session()
    session.headers.update({"User-Agent": f"PrusaConnectClient/{__version__}"})

    # 1. Setup PKCE
    verifier, challenge = _generate_pkce()

    # 2. Get Login Page
    params = {
        "response_type": "code",
        "client_id": consts.CLIENT_ID,
        "redirect_uri": consts.REDIRECT_URI,
        "code_challenge_method": "S256",
        "code_challenge": challenge,
    }

    logger.info("Initiating login flow...")
    logger.debug("Login params", url=consts.AUTH_URL, params=params)
    resp = session.get(consts.AUTH_URL, params=params)
    if resp.status_code != 200:
        raise exceptions.PrusaAuthError(f"Could not load login page (Status: {resp.status_code})")

    csrf = _extract_csrf(resp.text)
    if not csrf:
        logger.error("CSRF token missing from login page")
        raise exceptions.PrusaAuthError("Could not find CSRF token on login page.")

    # 3. Submit Credentials
    payload = {
        "csrfmiddlewaretoken": csrf,
        "next": _extract_next(resp.text),
        "email": email,
        "password": password,
    }

    headers = {"Referer": resp.url, "Origin": "https://account.prusa3d.com"}
    resp = session.post(resp.url, data=payload, headers=headers)

    # 4. Check for errors or TOTP
    if 'class="invalid-feedback"' in resp.text:
        raise exceptions.PrusaAuthError("Invalid email or password.")

    if "/login/totp/" in resp.url:
        logger.info("2FA Challenge detected.")
        resp = _handle_totp(session, resp, otp_callback)

    # 5. Handle Callback
    parsed = urllib.parse.urlparse(resp.url)
    if not parsed.path.endswith("/auth-callback"):
        raise exceptions.PrusaAuthError("Login failed: Did not redirect to auth-callback.")

    query = urllib.parse.parse_qs(parsed.query)
    if "code" not in query:
        raise exceptions.PrusaAuthError("Login failed: No authorization code in callback.")

    auth_code = query["code"][0]

    # 6. Exchange Code
    token_resp = session.post(
        consts.TOKEN_URL,
        data={
            "grant_type": "authorization_code",
            "client_id": consts.CLIENT_ID,
            "code": auth_code,
            "code_verifier": verifier,
            "redirect_uri": consts.REDIRECT_URI,
        },
    )

    if token_resp.status_code != 200:
        raise exceptions.PrusaAuthError(f"Token exchange failed: {token_resp.text}")

    logger.info("Token exchange successful.", user_id=token_resp.json().get("sub"))
    logger.debug("Token response", json=token_resp.json())

    return PrusaJWTTokenSet(**token_resp.json())