Hello community,
here is the log from the commit of package python-confluent-kafka for
openSUSE:Factory checked in at 2019-11-27 13:51:52
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-confluent-kafka (Old)
and /work/SRC/openSUSE:Factory/.python-confluent-kafka.new.26869 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-confluent-kafka"
Wed Nov 27 13:51:52 2019 rev:2 rq:745369 version:1.1.0
Changes:
--------
---
/work/SRC/openSUSE:Factory/python-confluent-kafka/python-confluent-kafka.changes
2018-12-06 12:19:36.785371965 +0100
+++
/work/SRC/openSUSE:Factory/.python-confluent-kafka.new.26869/python-confluent-kafka.changes
2019-11-27 13:51:58.816437914 +0100
@@ -1,0 +2,25 @@
+Thu Oct 31 09:17:20 UTC 2019 - Dirk Mueller <[email protected]>
+
+- update to 1.1.0:
+ * confluent-kafka-python is based on librdkafka v1.1.0, see the librdkafka
v1.1.0 release notes for a complete list of changes, enhancements, fixes and
upgrade considerations.
+
+ * ssl.endpoint.identification.algorithm=https (off by default) to validate
the broker hostname matches the certificate. Requires OpenSSL >= 1.0.2(included
with Wheel installations))
+ * Improved GSSAPI/Kerberos ticket refresh
+ * Confluent monitoring interceptor package bumped to v0.11.1 (#634)
+
+ New configuration properties:
+
+ * ssl.key.pem - client's private key as a string in PEM format
+ * ssl.certificate.pem - client's public key as a string in PEM format
+ * enable.ssl.certificate.verification - enable(default)/disable OpenSSL's
builtin broker certificate verification.
+ * enable.ssl.endpoint.identification.algorithm - to verify the broker's
hostname with its certificate (disabled by default).
+ * Add new rd_kafka_conf_set_ssl_cert() to pass PKCS#12, DER or PEM certs in
(binary) memory form to the configuration object.
+ * The private key data is now securely cleared from memory after last use.
+
+ * SASL GSSAPI/Kerberos: Don't run kinit refresh for each broker, just per
client instance.
+ * SASL GSSAPI/Kerberos: Changed sasl.kerberos.kinit.cmd to first attempt
ticket refresh, then acquire.
+ * SASL: Proper locking on broker name acquisition.
+ * Consumer: max.poll.interval.ms now correctly handles blocking poll calls,
allowing a longer poll timeout than the max poll interval.
+ * configure: Fix libzstd static lib detection
+
+-------------------------------------------------------------------
Old:
----
confluent-kafka-0.11.6.tar.gz
New:
----
confluent-kafka-1.1.0.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-confluent-kafka.spec ++++++
--- /var/tmp/diff_new_pack.J4CaLO/_old 2019-11-27 13:52:00.332436655 +0100
+++ /var/tmp/diff_new_pack.J4CaLO/_new 2019-11-27 13:52:00.380436615 +0100
@@ -1,7 +1,7 @@
#
# spec file for package python-confluent-kafka
#
-# Copyright (c) 2018 SUSE LINUX GmbH, Nuernberg, Germany.
+# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany.
#
# All modifications and additions to the file contributed by third parties
# remain the property of their copyright owners, unless otherwise agreed
@@ -18,7 +18,7 @@
%{?!python_module:%define python_module() python-%{**} python3-%{**}}
Name: python-confluent-kafka
-Version: 0.11.6
+Version: 1.1.0
Release: 0
Summary: Confluent's Apache Kafka client for Python
License: Apache-2.0
@@ -49,7 +49,7 @@
%prep
%setup -q -n confluent-kafka-%{version}
-%patch1 -p1
+%patch1
%build
export CFLAGS="%{optflags}"
++++++ confluent-kafka-0.11.6.tar.gz -> confluent-kafka-1.1.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/confluent-kafka-0.11.6/PKG-INFO
new/confluent-kafka-1.1.0/PKG-INFO
--- old/confluent-kafka-0.11.6/PKG-INFO 2018-11-09 14:36:54.000000000 +0100
+++ new/confluent-kafka-1.1.0/PKG-INFO 2019-07-15 16:50:57.000000000 +0200
@@ -1,12 +1,12 @@
Metadata-Version: 2.1
Name: confluent-kafka
-Version: 0.11.6
-Summary: Confluent's Apache Kafka client for Python
+Version: 1.1.0
+Summary: Confluent's Python client for Apache Kafka
Home-page: https://github.com/confluentinc/confluent-kafka-python
Author: Confluent Inc
Author-email: [email protected]
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
-Provides-Extra: avro
Provides-Extra: dev
+Provides-Extra: avro
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/confluent-kafka-0.11.6/README.md
new/confluent-kafka-1.1.0/README.md
--- old/confluent-kafka-0.11.6/README.md 2018-11-08 22:17:18.000000000
+0100
+++ new/confluent-kafka-1.1.0/README.md 2019-03-27 11:11:30.000000000 +0100
@@ -85,11 +85,8 @@
if msg is None:
continue
if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- continue
- else:
- print(msg.error())
- break
+ print("Consumer error: {}".format(msg.error()))
+ continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
@@ -172,11 +169,8 @@
continue
if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- continue
- else:
- print(msg.error())
- break
+ print("AvroConsumer error: {}".format(msg.error()))
+ continue
print(msg.value())
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/admin/__init__.py
new/confluent-kafka-1.1.0/confluent_kafka/admin/__init__.py
--- old/confluent-kafka-0.11.6/confluent_kafka/admin/__init__.py
2018-09-01 06:56:05.000000000 +0200
+++ new/confluent-kafka-1.1.0/confluent_kafka/admin/__init__.py 2019-03-27
11:11:30.000000000 +0100
@@ -556,7 +556,7 @@
of a broker id in the brokers dict.
"""
def __init__(self):
- self.partition = -1
+ self.id = -1
self.leader = -1
self.replicas = []
self.isrs = []
@@ -564,9 +564,9 @@
def __repr__(self):
if self.error is not None:
- return "PartitionMetadata({}, {})".format(self.partition,
self.error)
+ return "PartitionMetadata({}, {})".format(self.id, self.error)
else:
- return "PartitionMetadata({})".format(self.partition)
+ return "PartitionMetadata({})".format(self.id)
def __str__(self):
- return "{}".format(self.partition)
+ return "{}".format(self.id)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/avro/__init__.py
new/confluent-kafka-1.1.0/confluent_kafka/avro/__init__.py
--- old/confluent-kafka-0.11.6/confluent_kafka/avro/__init__.py 2018-11-08
22:17:18.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/avro/__init__.py 2019-03-27
11:11:30.000000000 +0100
@@ -29,23 +29,23 @@
def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):
- schema_registry_url = config.pop("schema.registry.url", None)
- schema_registry_ca_location =
config.pop("schema.registry.ssl.ca.location", None)
- schema_registry_certificate_location =
config.pop("schema.registry.ssl.certificate.location", None)
- schema_registry_key_location =
config.pop("schema.registry.ssl.key.location", None)
+ sr_conf = {key.replace("schema.registry.", ""): value
+ for key, value in config.items() if
key.startswith("schema.registry")}
- if schema_registry is None:
- if schema_registry_url is None:
- raise ValueError("Missing parameter: schema.registry.url")
+ if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
+ sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
+ sr_conf['sasl.username'] = config.get('sasl.username', '')
+ sr_conf['sasl.password'] = config.get('sasl.password', '')
+
+ ap_conf = {key: value
+ for key, value in config.items() if not
key.startswith("schema.registry")}
- schema_registry =
CachedSchemaRegistryClient(url=schema_registry_url,
-
ca_location=schema_registry_ca_location,
-
cert_location=schema_registry_certificate_location,
-
key_location=schema_registry_key_location)
- elif schema_registry_url is not None:
+ if schema_registry is None:
+ schema_registry = CachedSchemaRegistryClient(sr_conf)
+ elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with
schema.registry.url config")
- super(AvroProducer, self).__init__(config)
+ super(AvroProducer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry)
self._key_schema = default_key_schema
self._value_schema = default_value_schema
@@ -98,28 +98,32 @@
Constructor takes below parameters
:param dict config: Config parameters containing url for schema registry
(``schema.registry.url``)
- and the standard Kafka client configuration
(``bootstrap.servers`` et.al).
+ and the standard Kafka client configuration
(``bootstrap.servers`` et.al)
+ :param schema reader_key_schema: a reader schema for the message key
+ :param schema reader_value_schema: a reader schema for the message value
+ :raises ValueError: For invalid configurations
"""
- def __init__(self, config, schema_registry=None):
- schema_registry_url = config.pop("schema.registry.url", None)
- schema_registry_ca_location =
config.pop("schema.registry.ssl.ca.location", None)
- schema_registry_certificate_location =
config.pop("schema.registry.ssl.certificate.location", None)
- schema_registry_key_location =
config.pop("schema.registry.ssl.key.location", None)
+ def __init__(self, config, schema_registry=None, reader_key_schema=None,
reader_value_schema=None):
- if schema_registry is None:
- if schema_registry_url is None:
- raise ValueError("Missing parameter: schema.registry.url")
+ sr_conf = {key.replace("schema.registry.", ""): value
+ for key, value in config.items() if
key.startswith("schema.registry")}
+
+ if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
+ sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
+ sr_conf['sasl.username'] = config.get('sasl.username', '')
+ sr_conf['sasl.password'] = config.get('sasl.password', '')
- schema_registry =
CachedSchemaRegistryClient(url=schema_registry_url,
-
ca_location=schema_registry_ca_location,
-
cert_location=schema_registry_certificate_location,
-
key_location=schema_registry_key_location)
- elif schema_registry_url is not None:
+ ap_conf = {key: value
+ for key, value in config.items() if not
key.startswith("schema.registry")}
+
+ if schema_registry is None:
+ schema_registry = CachedSchemaRegistryClient(sr_conf)
+ elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with
schema.registry.url config")
- super(AvroConsumer, self).__init__(config)
- self._serializer = MessageSerializer(schema_registry)
+ super(AvroConsumer, self).__init__(ap_conf)
+ self._serializer = MessageSerializer(schema_registry,
reader_key_schema, reader_value_schema)
def poll(self, timeout=None):
"""
@@ -135,13 +139,19 @@
message = super(AvroConsumer, self).poll(timeout)
if message is None:
return None
- if not message.value() and not message.key():
- return message
+
if not message.error():
- if message.value() is not None:
- decoded_value =
self._serializer.decode_message(message.value())
- message.set_value(decoded_value)
- if message.key() is not None:
- decoded_key = self._serializer.decode_message(message.key())
- message.set_key(decoded_key)
+ try:
+ if message.value() is not None:
+ decoded_value =
self._serializer.decode_message(message.value(), is_key=False)
+ message.set_value(decoded_value)
+ if message.key() is not None:
+ decoded_key =
self._serializer.decode_message(message.key(), is_key=True)
+ message.set_key(decoded_key)
+ except SerializerError as e:
+ raise SerializerError("Message deserialization failed for
message at {} [{}] offset {}: {}".format(
+ message.topic(),
+ message.partition(),
+ message.offset(),
+ e))
return message
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/avro/cached_schema_registry_client.py
new/confluent-kafka-1.1.0/confluent_kafka/avro/cached_schema_registry_client.py
---
old/confluent-kafka-0.11.6/confluent_kafka/avro/cached_schema_registry_client.py
2018-11-08 22:17:18.000000000 +0100
+++
new/confluent-kafka-1.1.0/confluent_kafka/avro/cached_schema_registry_client.py
2019-05-30 12:30:32.000000000 +0200
@@ -21,15 +21,23 @@
#
import json
import logging
+import warnings
from collections import defaultdict
-import requests
+from requests import Session, utils
from .error import ClientError
from . import loads
+# Python 2 considers int an instance of str
+try:
+ string_type = basestring # noqa
+except NameError:
+ string_type = str
+
VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
VALID_METHODS = ['GET', 'POST', 'PUT', 'DELETE']
+VALID_AUTH_PROVIDERS = ['URL', 'USER_INFO', 'SASL_INHERIT']
# Common accept header sent
ACCEPT_HDR = "application/vnd.schemaregistry.v1+json,
application/vnd.schemaregistry+json, application/json"
@@ -40,19 +48,51 @@
"""
A client that talks to a Schema Registry over HTTP
- See http://confluent.io/docs/current/schema-registry/docs/intro.html
+ See http://confluent.io/docs/current/schema-registry/docs/intro.html for
more information.
+
+ .. deprecated::
+ Use CachedSchemaRegistryClient(dict: config) instead.
+ Existing params ca_location, cert_location and key_location will be
replaced with their librdkafka equivalents:
+ `ssl.ca.location`, `ssl.certificate.location` and `ssl.key.location`
respectively.
Errors communicating to the server will result in a ClientError being
raised.
- @:param: url: url to schema registry
+ :param str|dict url: url(deprecated) to schema registry or dictionary
containing client configuration.
+ :param str ca_location: File or directory path to CA certificate(s) for
verifying the Schema Registry key.
+ :param str cert_location: Path to client's public key used for
authentication.
+ :param str key_location: Path to client's private key used for
authentication.
"""
def __init__(self, url, max_schemas_per_subject=1000, ca_location=None,
cert_location=None, key_location=None):
- """Construct a client by passing in the base URL of the schema
registry server"""
+ # In order to maintain compatibility the url(conf in future versions)
param has been preserved for now.
+ conf = url
+ if not isinstance(url, dict):
+ conf = {
+ 'url': url,
+ 'ssl.ca.location': ca_location,
+ 'ssl.certificate.location': cert_location,
+ 'ssl.key.location': key_location
+ }
+ warnings.warn(
+ "CachedSchemaRegistry constructor is being deprecated. "
+ "Use CachedSchemaRegistryClient(dict: config) instead. "
+ "Existing params ca_location, cert_location and key_location
will be replaced with their "
+ "librdkafka equivalents as keys in the conf dict:
`ssl.ca.location`, `ssl.certificate.location` and "
+ "`ssl.key.location` respectively",
+ category=DeprecationWarning, stacklevel=2)
+
+ """Construct a Schema Registry client"""
+
+ # Ensure URL valid scheme is included; http[s]
+ url = conf.get('url', '')
+ if not isinstance(url, string_type):
+ raise TypeError("URL must be of type str")
+
+ if not url.startswith('http'):
+ raise ValueError("Invalid URL provided for Schema Registry")
self.url = url.rstrip('/')
- self.max_schemas_per_subject = max_schemas_per_subject
# subj => { schema => id }
self.subject_to_schema_ids = defaultdict(dict)
# id => avro_schema
@@ -60,17 +100,20 @@
# subj => { schema => version }
self.subject_to_schema_versions = defaultdict(dict)
- s = requests.Session()
- if ca_location is not None:
- s.verify = ca_location
- if cert_location is not None or key_location is not None:
- if cert_location is None or key_location is None:
- raise ValueError(
- "Both schema.registry.ssl.certificate.location and
schema.registry.ssl.key.location must be set")
- s.cert = (cert_location, key_location)
+ s = Session()
+ ca_path = conf.pop('ssl.ca.location', None)
+ if ca_path is not None:
+ s.verify = ca_path
+ s.cert = self._configure_client_tls(conf)
+ s.auth = self._configure_basic_auth(conf)
+
+ self.url = conf.pop('url')
self._session = s
+ if len(conf) > 0:
+ raise ValueError("Unrecognized configuration properties:
{}".format(conf.keys()))
+
def __del__(self):
self.close()
@@ -83,6 +126,33 @@
def close(self):
self._session.close()
+ @staticmethod
+ def _configure_basic_auth(conf):
+ url = conf['url']
+ auth_provider = conf.pop('basic.auth.credentials.source',
'URL').upper()
+ if auth_provider not in VALID_AUTH_PROVIDERS:
+ raise ValueError("schema.registry.basic.auth.credentials.source
must be one of {}"
+ .format(VALID_AUTH_PROVIDERS))
+ if auth_provider == 'SASL_INHERIT':
+ if conf.pop('sasl.mechanism', '').upper() is ['GSSAPI']:
+ raise ValueError("SASL_INHERIT does not support SASL
mechanisms GSSAPI")
+ auth = (conf.pop('sasl.username', ''), conf.pop('sasl.password',
''))
+ elif auth_provider == 'USER_INFO':
+ auth = tuple(conf.pop('basic.auth.user.info', '').split(':'))
+ else:
+ auth = utils.get_auth_from_url(url)
+ conf['url'] = utils.urldefragauth(url)
+ return auth
+
+ @staticmethod
+ def _configure_client_tls(conf):
+ cert = conf.pop('ssl.certificate.location', None),
conf.pop('ssl.key.location', None)
+ # Both values can be None or no values can be None
+ if bool(cert[0]) != bool(cert[1]):
+ raise ValueError(
+ "Both schema.registry.ssl.certificate.location and
schema.registry.ssl.key.location must be set")
+ return cert
+
def _send_request(self, url, method='GET', body=None, headers={}):
if method not in VALID_METHODS:
raise ClientError("Method {} is invalid; valid methods include
{}".format(method, VALID_METHODS))
@@ -94,9 +164,14 @@
_headers.update(headers)
response = self._session.request(method, url, headers=_headers,
json=body)
- return response.json(), response.status_code
+ # Returned by Jetty not SR so the payload is not json encoded
+ try:
+ return response.json(), response.status_code
+ except ValueError:
+ return response.content, response.status_code
- def _add_to_cache(self, cache, subject, schema, value):
+ @staticmethod
+ def _add_to_cache(cache, subject, schema, value):
sub_cache = cache[subject]
sub_cache[schema] = value
@@ -124,9 +199,10 @@
Multiple instances of the same schema will result in cache misses.
- @:param: subject: subject name
- @:param: avro_schema: Avro schema to be registered
- @:returns: schema_id: int value
+ :param str subject: subject name
+ :param schema avro_schema: Avro schema to be registered
+ :returns: schema_id
+ :rtype: int
"""
schemas_to_id = self.subject_to_schema_ids[subject]
@@ -139,7 +215,9 @@
body = {'schema': json.dumps(avro_schema.to_json())}
result, code = self._send_request(url, method='POST', body=body)
- if code == 409:
+ if (code == 401 or code == 403):
+ raise ClientError("Unauthorized access. Error code:" + str(code))
+ elif code == 409:
raise ClientError("Incompatible Avro schema:" + str(code))
elif code == 422:
raise ClientError("Invalid Avro schema:" + str(code))
@@ -151,12 +229,30 @@
self._cache_schema(avro_schema, schema_id, subject)
return schema_id
+ def delete_subject(self, subject):
+ """
+ DELETE /subjects/(string: subject)
+ Deletes the specified subject and its associated compatibility level
if registered.
+ It is recommended to use this API only when a topic needs to be
recycled or in development environments.
+ :param subject: subject name
+ :returns: version of the schema deleted under this subject
+ :rtype: (int)
+ """
+
+ url = '/'.join([self.url, 'subjects', subject])
+
+ result, code = self._send_request(url, method="DELETE")
+ if not (code >= 200 and code <= 299):
+ raise ClientError('Unable to delete subject: {}'.format(result))
+ return result
+
def get_by_id(self, schema_id):
"""
GET /schemas/ids/{int: id}
Retrieve a parsed avro schema by id or None if not found
- @:param: schema_id: int value
- @:returns: Avro schema
+ :param int schema_id: int value
+ :returns: Avro schema
+ :rtype: schema
"""
if schema_id in self.id_to_schema:
return self.id_to_schema[schema_id]
@@ -193,8 +289,9 @@
This call always contacts the registry.
If the subject is not found, (None,None,None) is returned.
- @:param: subject: subject name
- @:returns: (schema_id, schema, version)
+ :param str subject: subject name
+ :returns: (schema_id, schema, version)
+ :rtype: (string, schema, int)
"""
url = '/'.join([self.url, 'subjects', subject, 'versions', 'latest'])
@@ -228,9 +325,10 @@
Get the version of a schema for a given subject.
Returns None if not found.
- @:param: subject: subject name
- @:param: avro_schema: Avro schema
- @:returns: version
+ :param str subject: subject name
+ :param: schema avro_schema: Avro schema
+ :returns: version
+ :rtype: int
"""
schemas_to_version = self.subject_to_schema_versions[subject]
version = schemas_to_version.get(avro_schema, None)
@@ -259,9 +357,10 @@
Test the compatibility of a candidate parsed schema for a given
subject.
By default the latest version is checked against.
- @:param: subject: subject name
- @:param: avro_schema: Avro schema
- @:return: True if compatible, False if not compatible
+ :param: str subject: subject name
+ :param: schema avro_schema: Avro schema
+ :return: True if compatible, False if not compatible
+ :rtype: bool
"""
url = '/'.join([self.url, 'compatibility', 'subjects', subject,
'versions', str(version)])
@@ -289,7 +388,7 @@
Update the compatibility level for a subject. Level must be one of:
- @:param: level: ex: 'NONE','FULL','FORWARD', or 'BACKWARD'
+ :param str level: ex: 'NONE','FULL','FORWARD', or 'BACKWARD'
"""
if level not in VALID_LEVELS:
raise ClientError("Invalid level specified: %s" % (str(level)))
@@ -310,20 +409,21 @@
GET /config
Get the current compatibility level for a subject. Result will be one
of:
- @:param: subject: subject name
- @:raises: ClientError: if the request was unsuccessful or an invalid
compatibility level was returned
- @:return: 'NONE','FULL','FORWARD', or 'BACKWARD'
+ :param str subject: subject name
+ :raises ClientError: if the request was unsuccessful or an invalid
compatibility level was returned
+ :returns: one of 'NONE','FULL','FORWARD', or 'BACKWARD'
+ :rtype: bool
"""
url = '/'.join([self.url, 'config'])
if subject:
- url += '/' + subject
+ url = '/'.join([url, subject])
result, code = self._send_request(url)
is_successful_request = code >= 200 and code <= 299
if not is_successful_request:
raise ClientError('Unable to fetch compatibility level. Error
code: %d' % code)
- compatibility = result.get('compatibility', None)
+ compatibility = result.get('compatibilityLevel', None)
if compatibility not in VALID_LEVELS:
if compatibility is None:
error_msg_suffix = 'No compatibility was returned'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/avro/serializer/message_serializer.py
new/confluent-kafka-1.1.0/confluent_kafka/avro/serializer/message_serializer.py
---
old/confluent-kafka-0.11.6/confluent_kafka/avro/serializer/message_serializer.py
2018-11-08 16:21:13.000000000 +0100
+++
new/confluent-kafka-1.1.0/confluent_kafka/avro/serializer/message_serializer.py
2019-03-27 11:11:30.000000000 +0100
@@ -39,7 +39,7 @@
HAS_FAST = False
try:
- from fastavro import schemaless_reader
+ from fastavro import schemaless_reader, schemaless_writer
HAS_FAST = True
except ImportError:
@@ -68,14 +68,20 @@
All decode_* methods expect a buffer received from kafka.
"""
- def __init__(self, registry_client):
+ def __init__(self, registry_client, reader_key_schema=None,
reader_value_schema=None):
self.registry_client = registry_client
self.id_to_decoder_func = {}
self.id_to_writers = {}
+ self.reader_key_schema = reader_key_schema
+ self.reader_value_schema = reader_value_schema
- '''
-
- '''
+ # Encoder support
+ def _get_encoder_func(self, writer_schema):
+ if HAS_FAST:
+ schema = writer_schema.to_json()
+ return lambda record, fp: schemaless_writer(fp, schema, record)
+ writer = avro.io.DatumWriter(writer_schema)
+ return lambda record, fp: writer.write(record,
avro.io.BinaryEncoder(fp))
def encode_record_with_schema(self, topic, schema, record, is_key=False):
"""
@@ -83,11 +89,12 @@
record is expected to be a dictionary.
The schema is registered with the subject of 'topic-value'
- @:param topic : Topic name
- @:param schema : Avro Schema
- @:param record : An object to serialize
- @:param is_key : If the record is a key
- @:returns : Encoded record with schema ID as bytes
+ :param str topic: Topic name
+ :param schema schema: Avro Schema
+ :param dict record: An object to serialize
+ :param bool is_key: If the record is a key
+ :returns: Encoded record with schema ID as bytes
+ :rtype: bytes
"""
serialize_err = KeySerializerError if is_key else ValueSerializerError
@@ -101,7 +108,7 @@
raise serialize_err(message)
# cache writer
- self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
+ self.id_to_writers[schema_id] = self._get_encoder_func(schema)
return self.encode_record_with_schema_id(schema_id, record,
is_key=is_key)
@@ -109,10 +116,11 @@
"""
Encode a record with a given schema id. The record must
be a python dictionary.
- @:param: schema_id : integer ID
- @:param: record : An object to serialize
- @:param is_key : If the record is a key
- @:returns: decoder function
+ :param int schema_id: integer ID
+ :param dict record: An object to serialize
+ :param bool is_key: If the record is a key
+ :returns: decoder function
+ :rtype: func
"""
serialize_err = KeySerializerError if is_key else ValueSerializerError
@@ -124,7 +132,7 @@
schema = self.registry_client.get_by_id(schema_id)
if not schema:
raise serialize_err("Schema does not exist")
- self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
+ self.id_to_writers[schema_id] = self._get_encoder_func(schema)
except ClientError:
exc_type, exc_value, exc_traceback = sys.exc_info()
raise serialize_err(repr(traceback.format_exception(exc_type,
exc_value, exc_traceback)))
@@ -132,44 +140,38 @@
# get the writer
writer = self.id_to_writers[schema_id]
with ContextStringIO() as outf:
- # write the header
- # magic byte
-
- outf.write(struct.pack('b', MAGIC_BYTE))
-
- # write the schema ID in network byte order (big end)
+ # Write the magic byte and schema ID in network byte order (big
endian)
+ outf.write(struct.pack('>bI', MAGIC_BYTE, schema_id))
- outf.write(struct.pack('>I', schema_id))
-
- # write the record to the rest of it
- # Create an encoder that we'll write to
- encoder = avro.io.BinaryEncoder(outf)
- # write the magic byte
- # write the object in 'obj' as Avro to the fake file...
- writer.write(record, encoder)
+ # write the record to the rest of the buffer
+ writer(record, outf)
return outf.getvalue()
# Decoder support
- def _get_decoder_func(self, schema_id, payload):
+ def _get_decoder_func(self, schema_id, payload, is_key=False):
if schema_id in self.id_to_decoder_func:
return self.id_to_decoder_func[schema_id]
- # fetch from schema reg
+ # fetch writer schema from schema reg
try:
- schema = self.registry_client.get_by_id(schema_id)
+ writer_schema_obj = self.registry_client.get_by_id(schema_id)
except ClientError as e:
raise SerializerError("unable to fetch schema with id %d: %s" %
(schema_id, str(e)))
- if schema is None:
+ if writer_schema_obj is None:
raise SerializerError("unable to fetch schema with id %d" %
(schema_id))
curr_pos = payload.tell()
+
+ reader_schema_obj = self.reader_key_schema if is_key else
self.reader_value_schema
+
if HAS_FAST:
# try to use fast avro
try:
- schema_dict = schema.to_json()
- schemaless_reader(payload, schema_dict)
+ writer_schema = writer_schema_obj.to_json()
+ reader_schema = reader_schema_obj.to_json()
+ schemaless_reader(payload, writer_schema)
# If we reach this point, this means we have fastavro and it
can
# do this deserialization. Rewind since this method just
determines
@@ -177,7 +179,8 @@
# normal path.
payload.seek(curr_pos)
- self.id_to_decoder_func[schema_id] = lambda p:
schemaless_reader(p, schema_dict)
+ self.id_to_decoder_func[schema_id] = lambda p:
schemaless_reader(
+ p, writer_schema, reader_schema)
return self.id_to_decoder_func[schema_id]
except Exception:
# Fast avro failed, fall thru to standard avro below.
@@ -186,7 +189,13 @@
# here means we should just delegate to slow avro
# rewind
payload.seek(curr_pos)
- avro_reader = avro.io.DatumReader(schema)
+ # Avro DatumReader py2/py3 inconsistency, hence no param keywords
+ # should be revisited later
+ # https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459
+ #
https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423
+ # def __init__(self, writers_schema=None, readers_schema=None)
+ # def __init__(self, writer_schema=None, reader_schema=None)
+ avro_reader = avro.io.DatumReader(writer_schema_obj, reader_schema_obj)
def decoder(p):
bin_decoder = avro.io.BinaryDecoder(p)
@@ -195,11 +204,13 @@
self.id_to_decoder_func[schema_id] = decoder
return self.id_to_decoder_func[schema_id]
- def decode_message(self, message):
+ def decode_message(self, message, is_key=False):
"""
Decode a message from kafka that has been encoded for use with
the schema registry.
- @:param: message
+ :param str|bytes or None message: message key or value to be decoded
+ :returns: Decoded message contents.
+ :rtype dict:
"""
if message is None:
@@ -212,5 +223,5 @@
magic, schema_id = struct.unpack('>bI', payload.read(5))
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
- decoder_func = self._get_decoder_func(schema_id, payload)
+ decoder_func = self._get_decoder_func(schema_id, payload, is_key)
return decoder_func(payload)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/kafkatest/verifiable_client.py
new/confluent-kafka-1.1.0/confluent_kafka/kafkatest/verifiable_client.py
--- old/confluent-kafka-0.11.6/confluent_kafka/kafkatest/verifiable_client.py
2018-11-08 22:17:18.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/kafkatest/verifiable_client.py
2019-03-27 11:11:30.000000000 +0100
@@ -72,10 +72,7 @@
continue
if n.startswith('topicconf_'):
- # Apply topicconf_ properties on default.topic.config
- if 'default.topic.config' not in conf:
- conf['default.topic.config'] = dict()
- conf['default.topic.config'][n[10:]] = v
+ conf[n[10:]] = v
continue
if not n.startswith('conf_'):
@@ -91,10 +88,6 @@
# "org.apache.kafka.clients.consumer.RangeAssignor" -> "range"
conf[n] =
re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor',
lambda x: x.group(1).lower(), v)
-
- elif n == 'enable.idempotence':
- # Ignore idempotence for now, best-effortly.
- sys.stderr.write('%% WARN: Ignoring unsupported %s=%s\n' % (n,
v))
else:
conf[n] = v
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/kafkatest/verifiable_consumer.py
new/confluent-kafka-1.1.0/confluent_kafka/kafkatest/verifiable_consumer.py
--- old/confluent-kafka-0.11.6/confluent_kafka/kafkatest/verifiable_consumer.py
2018-11-09 14:36:10.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/kafkatest/verifiable_consumer.py
2019-03-27 11:11:30.000000000 +0100
@@ -29,7 +29,7 @@
"""
def __init__(self, conf):
"""
- \p conf is a config dict passed to confluent_kafka.Consumer()
+ conf is a config dict passed to confluent_kafka.Consumer()
"""
super(VerifiableConsumer, self).__init__(conf)
self.conf['on_commit'] = self.on_commit
@@ -44,7 +44,7 @@
self.assignment_dict = dict()
def find_assignment(self, topic, partition):
- """ Find and return existing assignment based on \p topic and \p
partition,
+ """ Find and return existing assignment based on topic and partition,
or None on miss. """
skey = '%s %d' % (topic, partition)
return self.assignment_dict.get(skey)
@@ -74,7 +74,7 @@
self.consumed_msgs_last_reported = self.consumed_msgs
def send_assignment(self, evtype, partitions):
- """ Send assignment update, \p evtype is either 'assigned' or
'revoked' """
+ """ Send assignment update, evtype is either 'assigned' or 'revoked'
"""
d = {'name': 'partitions_' + evtype,
'partitions': [{'topic': x.topic, 'partition': x.partition} for x
in partitions]}
self.send(d)
@@ -188,11 +188,7 @@
def msg_consume(self, msg):
""" Handle consumed message (or error event) """
if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- # ignore EOF
- pass
- else:
- self.err('Consume failed: %s' % msg.error(), term=False)
+ self.err('Consume failed: %s' % msg.error(), term=False)
return
if False:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/kafkatest/verifiable_producer.py
new/confluent-kafka-1.1.0/confluent_kafka/kafkatest/verifiable_producer.py
--- old/confluent-kafka-0.11.6/confluent_kafka/kafkatest/verifiable_producer.py
2018-11-09 14:36:10.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/kafkatest/verifiable_producer.py
2019-03-27 11:11:30.000000000 +0100
@@ -28,7 +28,7 @@
"""
def __init__(self, conf):
"""
- \p conf is a config dict passed to confluent_kafka.Producer()
+ conf is a config dict passed to confluent_kafka.Producer()
"""
super(VerifiableProducer, self).__init__(conf)
self.conf['on_delivery'] = self.dr_cb
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/src/Consumer.c
new/confluent-kafka-1.1.0/confluent_kafka/src/Consumer.c
--- old/confluent-kafka-0.11.6/confluent_kafka/src/Consumer.c 2018-11-08
22:17:18.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/src/Consumer.c 2019-06-12
14:30:14.000000000 +0200
@@ -1009,7 +1009,7 @@
static PyMethodDef Consumer_methods[] = {
{ "subscribe", (PyCFunction)Consumer_subscribe,
METH_VARARGS|METH_KEYWORDS,
- ".. py:function:: subscribe(topics, [listener=None])\n"
+ ".. py:function:: subscribe(topics, [on_assign=None],
[on_revoke=None])\n"
"\n"
" Set subscription to supplied list of topics\n"
" This replaces a previous subscription.\n"
@@ -1236,7 +1236,8 @@
" :param bool cached: Instead of querying the broker used cached
information. "
"Cached values: The low offset is updated periodically (if
statistics.interval.ms is set) while "
"the high offset is updated on each message fetched from the broker
for this partition.\n"
- " :returns: Tuple of (low,high) on success or None on timeout.\n"
+ " :returns: Tuple of (low,high) on success or None on timeout. "
+ "The high offset is the offset of the last message + 1.\n"
" :rtype: tuple(int,int)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
@@ -1387,6 +1388,7 @@
rd_kafka_poll_set_consumer(self->rk);
self->u.Consumer.rkqu = rd_kafka_queue_get_consumer(self->rk);
+ assert(self->u.Consumer.rkqu);
return 0;
}
@@ -1423,8 +1425,8 @@
"\n"
".. py:function:: Consumer(config)\n"
"\n"
- " :param dict config: Configuration properties. At a minimum
``bootstrap.servers`` and "
- "``group.id`` **should** be set"
+ " :param dict config: Configuration properties. At a minimum
``group.id`` **must** be set,"
+ " ``bootstrap.servers`` **should** be set"
"\n"
"Create new Consumer instance using provided configuration dict.\n"
"\n"
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/src/Producer.c
new/confluent-kafka-1.1.0/confluent_kafka/src/Producer.c
--- old/confluent-kafka-0.11.6/confluent_kafka/src/Producer.c 2018-11-08
19:24:44.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/src/Producer.c 2019-04-01
23:16:11.000000000 +0200
@@ -146,9 +146,7 @@
msgobj = Message_new0(self, rkm);
- args = Py_BuildValue("(OO)",
- Message_error((Message *)msgobj, NULL),
- msgobj);
+ args = Py_BuildValue("(OO)", ((Message *)msgobj)->error, msgobj);
Py_DECREF(msgobj);
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/src/confluent_kafka.c
new/confluent-kafka-1.1.0/confluent_kafka/src/confluent_kafka.c
--- old/confluent-kafka-0.11.6/confluent_kafka/src/confluent_kafka.c
2018-11-09 14:36:10.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/src/confluent_kafka.c
2019-07-15 12:31:13.000000000 +0200
@@ -62,9 +62,12 @@
char *str; /* Human readable representation of error, if one
* was provided by librdkafka.
* Else falls back on err2str(). */
+ int fatal; /**< Set to true if a fatal error. */
} KafkaError;
+static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason);
+
static PyObject *KafkaError_code (KafkaError *self, PyObject *ignore) {
return cfl_PyInt_FromInt(self->code);
}
@@ -81,6 +84,20 @@
return cfl_PyUnistr(_FromString(rd_kafka_err2name(self->code)));
}
+static PyObject *KafkaError_fatal (KafkaError *self, PyObject *ignore) {
+ PyObject *ret = self->fatal ? Py_True : Py_False;
+ Py_INCREF(ret);
+ return ret;
+}
+
+
+static PyObject *KafkaError_test_raise_fatal (KafkaError *null,
+ PyObject *ignore) {
+ cfl_PyErr_Fatal(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "This is a fatal exception for testing purposes");
+ return NULL;
+}
+
static PyMethodDef KafkaError_methods[] = {
{ "code", (PyCFunction)KafkaError_code, METH_NOARGS,
@@ -105,6 +122,14 @@
" :rtype: str\n"
"\n"
},
+ { "fatal", (PyCFunction)KafkaError_fatal, METH_NOARGS,
+ " :returns: True if this a fatal error, else False.\n"
+ " :rtype: bool\n"
+ "\n"
+ },
+ { "_test_raise_fatal", (PyCFunction)KafkaError_test_raise_fatal,
+ METH_NOARGS|METH_STATIC
+ },
{ NULL }
};
@@ -133,7 +158,8 @@
}
static PyObject *KafkaError_str0 (KafkaError *self) {
- return cfl_PyUnistr(_FromFormat("KafkaError{code=%s,val=%d,str=\"%s\"}",
+ return
cfl_PyUnistr(_FromFormat("KafkaError{%scode=%s,val=%d,str=\"%s\"}",
+ self->fatal?"FATAL,":"",
rd_kafka_err2name(self->code),
self->code,
self->str ? self->str :
@@ -247,6 +273,7 @@
static void KafkaError_init (KafkaError *self,
rd_kafka_resp_err_t code, const char *str) {
self->code = code;
+ self->fatal = 0;
if (str)
self->str = strdup(str);
else
@@ -292,6 +319,17 @@
}
+/**
+ * @brief Raise exception from fatal error.
+ */
+static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason) {
+ PyObject *eo = KafkaError_new0(err, "%s", reason);
+ ((KafkaError *)eo)->fatal = 1;
+ PyErr_SetObject(KafkaException, eo);
+}
+
+
+
/****************************************************************************
@@ -305,6 +343,10 @@
****************************************************************************/
+/**
+ * @returns a Message's error object, if any, else None.
+ * @remark The error object refcount is increased by this function.
+ */
PyObject *Message_error (Message *self, PyObject *ignore) {
if (self->error) {
Py_INCREF(self->error);
@@ -1162,6 +1204,7 @@
}
#endif
+
/****************************************************************************
*
*
@@ -1177,6 +1220,16 @@
CallState *cs;
cs = CallState_get(h);
+
+ /* If the client raised a fatal error we'll raise an exception
+ * rather than calling the error callback. */
+ if (err == RD_KAFKA_RESP_ERR__FATAL) {
+ char errstr[512];
+ err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
+ cfl_PyErr_Fatal(err, errstr);
+ goto crash;
+ }
+
if (!h->error_cb) {
/* No callback defined */
goto done;
@@ -1189,6 +1242,7 @@
if (result)
Py_DECREF(result);
else {
+ crash:
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
@@ -1579,6 +1633,15 @@
PyDict_Update(confdict, kwargs);
}
+ if (ktype == RD_KAFKA_CONSUMER &&
+ !PyDict_GetItemString(confdict, "group.id")) {
+
+ PyErr_SetString(PyExc_ValueError,
+ "Failed to create consumer: group.id must be
set");
+ Py_DECREF(confdict);
+ return NULL;
+ }
+
conf = rd_kafka_conf_new();
/*
@@ -1766,8 +1829,7 @@
Py_DECREF(confdict);
- if (h->error_cb)
- rd_kafka_conf_set_error_cb(conf, error_cb);
+ rd_kafka_conf_set_error_cb(conf, error_cb);
if (h->throttle_cb)
rd_kafka_conf_set_throttle_cb(conf, throttle_cb);
@@ -1942,6 +2004,9 @@
* @brief Get attribute \p attr_name from \p object and verify it is
* of type \p py_type.
*
+ * @param py_type the value type of \p attr_name must match \p py_type, unless
+ * \p py_type is NULL.
+ *
* @returns 1 if \p valp was updated with the object (new reference) or NULL
* if not matched and not required, or
* 0 if an exception was raised.
@@ -1963,7 +2028,7 @@
return 0;
}
- if (Py_TYPE(o) != py_type) {
+ if (py_type && Py_TYPE(o) != py_type) {
Py_DECREF(o);
PyErr_Format(PyExc_TypeError,
"Expected .%s to be %s type, not %s",
@@ -2047,7 +2112,10 @@
#ifdef PY3
&PyUnicode_Type,
#else
- &PyString_Type,
+ /* Python 2: support both str and unicode
+ * let PyObject_Unistr() do the
+ * proper conversion below. */
+ NULL,
#endif
required))
return 0;
@@ -2119,8 +2187,13 @@
rd_kafka_version());
}
+/*
+ * Version hex representation
+ * 0xMMmmRRPP
+ * MM=major, mm=minor, RR=revision, PP=patchlevel (not used)
+ */
static PyObject *version (PyObject *self, PyObject *args) {
- return Py_BuildValue("si", "0.11.6", 0x000b0600);
+ return Py_BuildValue("si", "1.1.0", 0x01010000);
}
static PyMethodDef cimpl_methods[] = {
@@ -2219,7 +2292,7 @@
static struct PyModuleDef cimpl_moduledef = {
PyModuleDef_HEAD_INIT,
"cimpl", /* m_name */
- "Confluent's Apache Kafka Python client (C implementation)", /* m_doc */
+ "Confluent's Python client for Apache Kafka (C implementation)", /*
m_doc */
-1, /* m_size */
cimpl_methods, /* m_methods */
};
@@ -2250,7 +2323,7 @@
m = PyModule_Create(&cimpl_moduledef);
#else
m = Py_InitModule3("cimpl", cimpl_methods,
- "Confluent's Apache Kafka Python client (C
implementation)");
+ "Confluent's Python client for Apache Kafka (C
implementation)");
#endif
if (!m)
return NULL;
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka/src/confluent_kafka.h
new/confluent-kafka-1.1.0/confluent_kafka/src/confluent_kafka.h
--- old/confluent-kafka-0.11.6/confluent_kafka/src/confluent_kafka.h
2018-11-08 22:17:18.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka/src/confluent_kafka.h
2019-03-27 11:11:30.000000000 +0100
@@ -40,19 +40,19 @@
* Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error
* defines and strings in sync.
*/
-#define MIN_RD_KAFKA_VERSION 0x000b0500
+#define MIN_RD_KAFKA_VERSION 0x01000000
#ifdef __APPLE__
-#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v0.11.5 or
later. Install the latest version of librdkafka from Homebrew by running `brew
install librdkafka` or `brew upgrade librdkafka`"
+#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v1.0.0 or
later. Install the latest version of librdkafka from Homebrew by running `brew
install librdkafka` or `brew upgrade librdkafka`"
#else
-#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v0.11.5 or
later. Install the latest version of librdkafka from the Confluent
repositories, see http://docs.confluent.io/current/installation.html"
+#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v1.0.0 or
later. Install the latest version of librdkafka from the Confluent
repositories, see http://docs.confluent.io/current/installation.html"
#endif
#if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION
#ifdef __APPLE__
-#error "confluent-kafka-python requires librdkafka v0.11.5 or later. Install
the latest version of librdkafka from Homebrew by running `brew install
librdkafka` or `brew upgrade librdkafka`"
+#error "confluent-kafka-python requires librdkafka v1.0.0 or later. Install
the latest version of librdkafka from Homebrew by running `brew install
librdkafka` or `brew upgrade librdkafka`"
#else
-#error "confluent-kafka-python requires librdkafka v0.11.5 or later. Install
the latest version of librdkafka from the Confluent repositories, see
http://docs.confluent.io/current/installation.html"
+#error "confluent-kafka-python requires librdkafka v1.0.0 or later. Install
the latest version of librdkafka from the Confluent repositories, see
http://docs.confluent.io/current/installation.html"
#endif
#endif
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka.egg-info/PKG-INFO
new/confluent-kafka-1.1.0/confluent_kafka.egg-info/PKG-INFO
--- old/confluent-kafka-0.11.6/confluent_kafka.egg-info/PKG-INFO
2018-11-09 14:36:54.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka.egg-info/PKG-INFO 2019-07-15
16:50:57.000000000 +0200
@@ -1,12 +1,12 @@
Metadata-Version: 2.1
Name: confluent-kafka
-Version: 0.11.6
-Summary: Confluent's Apache Kafka client for Python
+Version: 1.1.0
+Summary: Confluent's Python client for Apache Kafka
Home-page: https://github.com/confluentinc/confluent-kafka-python
Author: Confluent Inc
Author-email: [email protected]
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
-Provides-Extra: avro
Provides-Extra: dev
+Provides-Extra: avro
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/confluent-kafka-0.11.6/confluent_kafka.egg-info/requires.txt
new/confluent-kafka-1.1.0/confluent_kafka.egg-info/requires.txt
--- old/confluent-kafka-0.11.6/confluent_kafka.egg-info/requires.txt
2018-11-09 14:36:54.000000000 +0100
+++ new/confluent-kafka-1.1.0/confluent_kafka.egg-info/requires.txt
2019-07-15 16:50:57.000000000 +0200
@@ -1,11 +1,21 @@
+
+[:python_version < "3.2"]
futures
+requests
+
+[:python_version < "3.4"]
enum34
[avro]
fastavro
requests
+
+[avro:python_version < "3.0"]
avro
+[avro:python_version > "3.0"]
+avro-python3
+
[dev]
-pytest
+pytest==4.6.4
flake8
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/confluent-kafka-0.11.6/setup.py
new/confluent-kafka-1.1.0/setup.py
--- old/confluent-kafka-0.11.6/setup.py 2018-11-09 14:36:10.000000000 +0100
+++ new/confluent-kafka-1.1.0/setup.py 2019-07-15 12:31:13.000000000 +0200
@@ -3,16 +3,13 @@
import os
from setuptools import setup, find_packages
from distutils.core import Extension
-import sys
import platform
-INSTALL_REQUIRES = list()
-
-if sys.version_info[0] < 3:
- avro = 'avro'
- INSTALL_REQUIRES.extend(['futures', 'enum34'])
-else:
- avro = 'avro-python3'
+INSTALL_REQUIRES = [
+ 'futures;python_version<"3.2"',
+ 'enum34;python_version<"3.4"',
+ 'requests;python_version<"3.2"'
+]
# On Un*x the library is linked as -lrdkafka,
# while on windows we need the full librdkafka name.
@@ -41,8 +38,8 @@
setup(name='confluent-kafka',
- version='0.11.6',
- description='Confluent\'s Apache Kafka client for Python',
+ version='1.1.0',
+ description='Confluent\'s Python client for Apache Kafka',
author='Confluent Inc',
author_email='[email protected]',
url='https://github.com/confluentinc/confluent-kafka-python',
@@ -51,6 +48,11 @@
data_files=[('', ['LICENSE.txt'])],
install_requires=INSTALL_REQUIRES,
extras_require={
- 'avro': ['fastavro', 'requests', avro],
+ 'avro': [
+ 'fastavro',
+ 'requests',
+ 'avro;python_version<"3.0"',
+ 'avro-python3;python_version>"3.0"'
+ ],
'dev': get_install_requirements("test-requirements.txt")
})
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/confluent-kafka-0.11.6/test-requirements.txt
new/confluent-kafka-1.1.0/test-requirements.txt
--- old/confluent-kafka-0.11.6/test-requirements.txt 2018-08-30
15:05:20.000000000 +0200
+++ new/confluent-kafka-1.1.0/test-requirements.txt 2019-07-15
12:31:13.000000000 +0200
@@ -1,2 +1,2 @@
-pytest
-flake8
\ No newline at end of file
+pytest==4.6.4
+flake8
++++++ no-license-as-datafile.patch ++++++
--- /var/tmp/diff_new_pack.J4CaLO/_old 2019-11-27 13:52:01.260435885 +0100
+++ /var/tmp/diff_new_pack.J4CaLO/_new 2019-11-27 13:52:01.272435875 +0100
@@ -1,12 +1,10 @@
-Index: confluent-kafka-0.11.6/setup.py
-===================================================================
---- confluent-kafka-0.11.6.orig/setup.py
-+++ confluent-kafka-0.11.6/setup.py
-@@ -48,7 +48,6 @@ setup(name='confluent-kafka',
+--- setup.py
++++ setup.py
+@@ -45,7 +45,6 @@
url='https://github.com/confluentinc/confluent-kafka-python',
ext_modules=[module],
packages=find_packages(exclude=("tests", "tests.*")),
- data_files=[('', ['LICENSE.txt'])],
install_requires=INSTALL_REQUIRES,
extras_require={
- 'avro': ['fastavro', 'requests', avro],
+ 'avro': [