Source code for langchain_community.utilities.cassandra
from __future__ import annotations
import asyncio
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable
if TYPE_CHECKING:
    from cassandra.cluster import ResponseFuture, Session
[docs]async def wrapped_response_future(
    func: Callable[..., ResponseFuture], *args: Any, **kwargs: Any
) -> Any:
    """Wrap a Cassandra response future in an asyncio future.
    Args:
        func: The Cassandra function to call.
        *args: The arguments to pass to the Cassandra function.
        **kwargs: The keyword arguments to pass to the Cassandra function.
    Returns:
        The result of the Cassandra function.
    """
    loop = asyncio.get_event_loop()
    asyncio_future = loop.create_future()
    response_future = func(*args, **kwargs)
    def success_handler(_: Any) -> None:
        loop.call_soon_threadsafe(asyncio_future.set_result, response_future.result())
    def error_handler(exc: BaseException) -> None:
        loop.call_soon_threadsafe(asyncio_future.set_exception, exc)
    response_future.add_callbacks(success_handler, error_handler)
    return await asyncio_future 
[docs]async def aexecute_cql(session: Session, query: str, **kwargs: Any) -> Any:
    """Execute a CQL query asynchronously.
    Args:
        session: The Cassandra session to use.
        query: The CQL query to execute.
        kwargs: Additional keyword arguments to pass to the session execute method.
    Returns:
        The result of the query.
    """
    return await wrapped_response_future(session.execute_async, query, **kwargs) 
[docs]class SetupMode(Enum):
    SYNC = 1
    ASYNC = 2
    OFF = 3