pykafka.simpleconsumer¶
-
class
pykafka.simpleconsumer.
SimpleConsumer
(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False)¶ Bases:
object
A non-balancing consumer for Kafka
-
__del__
()¶ Stop consumption and workers when object is deleted
-
__init__
(topic, cluster, consumer_group=None, partitions=None, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False)¶ Create a SimpleConsumer.
Settings and default values are taken from the Scala consumer implementation. Consumer group is included because it’s necessary for offset management, but doesn’t imply that this is a balancing consumer. Use a BalancedConsumer for that.
Parameters: - topic (
pykafka.topic.Topic
) – The topic this consumer should consume - cluster (
pykafka.cluster.Cluster
) – The cluster to which this consumer should connect - consumer_group (bytes) – The name of the consumer group this consumer should use for offset committing and fetching.
- partitions (Iterable of
pykafka.partition.Partition
) – Existing partitions to which to connect - fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch
- num_consumer_fetchers (int) – The number of workers used to make FetchRequests
- auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
- auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
- queued_max_messages (int) – Maximum number of messages buffered for consumption
- fetch_min_bytes (int) – The minimum amount of data (in bytes) the server should return for a fetch request. If insufficient data is available the request will block until sufficient data is available.
- fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
- offsets_channel_backoff_ms (int) – Backoff time (in milliseconds) to retry offset commits/fetches
- offsets_commit_max_retries (int) – Retry the offset commit up to this many times on failure.
- auto_offset_reset (
pykafka.common.OffsetType
) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered. - consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
- auto_start (bool) – Whether the consumer should begin communicating with kafka after __init__ is complete. If false, communication can be started with start().
- reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
- topic (
-
__iter__
()¶ Yield an infinite stream of messages until the consumer times out
-
__weakref__
¶ list of weak references to the object (if defined)
-
_auto_commit
()¶ Commit offsets only if it’s time to do so
-
_build_default_error_handlers
()¶ Set up the error handlers to use for partition errors.
-
_discover_offset_manager
()¶ Set the offset manager for this consumer.
If a consumer group is not supplied to __init__, this method does nothing
-
_raise_worker_exceptions
()¶ Raises exceptions encountered on worker threads
-
_setup_autocommit_worker
()¶ Start the autocommitter thread
-
_setup_fetch_workers
()¶ Start the fetcher threads
-
_update
()¶ Update the consumer and cluster after an ERROR_CODE
-
commit_offsets
()¶ Commit offsets for this consumer’s partitions
Uses the offset commit/fetch API
-
consume
(block=True)¶ Get one message from the consumer.
Parameters: block (bool) – Whether to block while waiting for a message
-
fetch
()¶ Fetch new messages for all partitions
Create a FetchRequest for each broker and send it. Enqueue each of the returned messages in the approprate OwnedPartition.
-
fetch_offsets
()¶ Fetch offsets for this consumer’s topic
Uses the offset commit/fetch API
Returns: List of (id, pykafka.protocol.OffsetFetchPartitionResponse
) tuples
-
held_offsets
¶ Return a map from partition id to held offset for each partition
-
partitions
¶ A list of the partitions that this consumer consumes
-
reset_offsets
(partition_offsets=None)¶ Reset offsets for the specified partitions
Issue an OffsetRequest for each partition and set the appropriate returned offset in the consumer’s internal offset counter.
Parameters: partition_offsets (Iterable of ( pykafka.partition.Partition
, int)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp of the message whose offset the partition should have OR the new offset the partition should haveNOTE: If an instance of timestamp_or_offset is treated by kafka as an invalid offset timestamp, this function directly sets the consumer’s internal offset counter for that partition to that instance of timestamp_or_offset. On the next fetch request, the consumer attempts to fetch messages starting from that offset. See the following link for more information on what kafka treats as a valid offset timestamp: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
-
start
()¶ Begin communicating with Kafka, including setting up worker threads
Fetches offsets, starts an offset autocommitter worker pool, and starts a message fetcher worker pool.
-
stop
()¶ Flag all running workers for deletion.
-
topic
¶ The topic this consumer consumes
-