This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git
The following commit(s) were added to refs/heads/master by this push: new a11329a validate connectors a11329a is described below commit a11329a56f31739322b39b7feb359772ef15ec4f Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Thu Jul 19 13:13:48 2018 -0700 validate connectors --- Release-Candidate-Validation.md | 140 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/Release-Candidate-Validation.md b/Release-Candidate-Validation.md index db9bb91..39fd899 100644 --- a/Release-Candidate-Validation.md +++ b/Release-Candidate-Validation.md @@ -273,3 +273,143 @@ cqlsh> USE pulsar_test_keyspace; cqlsh:pulsar_test_keyspace> CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text); ``` + +3. Prepare a cassandra sink yaml file and put it under examples directory as `cassandra-sink.yml`. + +```shell +$ cat examples/cassandra-sink.yml +configs: + roots: "localhost:9042" + keyspace: "pulsar_test_keyspace" + columnFamily: "pulsar_test_table" + keyname: "key" + columnName: "col" +``` + +4. Submit a cassandra sink + +```shell +$ bin/pulsar-admin sink create --tenant public --namespace default --name cassandra-test-sink --sink-type cassandra --sinkConfigFile examples/cassandra-sink.yml --inputs test_cassandra +"Created successfully" +``` + +```shell +// get the sink info +$ bin/pulsar-admin functions get --tenant public --namespace default --name cassandra-test-sink +{ + "tenant": "public", + "namespace": "default", + "name": "cassandra-test-sink", + "className": "org.apache.pulsar.functions.api.utils.IdentityFunction", + "autoAck": true, + "parallelism": 1, + "source": { + "topicsToSerDeClassName": { + "test_cassandra": "" + } + }, + "sink": { + "configs": "{\"roots\":\"cassandra\",\"keyspace\":\"pulsar_test_keyspace\",\"columnFamily\":\"pulsar_test_table\",\"keyname\":\"key\",\"columnName\":\"col\"}", + "builtin": "cassandra" + }, + "resources": {} +} +``` + +```shell +// get the running status +$ bin/pulsar-admin functions getstatus --tenant public --namespace default --name cassandra-test-sink +{ + "functionStatusList": [ + { + "running": true, + "instanceId": "0", + "metrics": { + "metrics": { + "__total_processed__": {}, + "__total_successfully_processed__": {}, + "__total_system_exceptions__": {}, + "__total_user_exceptions__": {}, + "__total_serialization_exceptions__": {}, + "__avg_latency_ms__": {} + } + }, + "workerId": "c-standalone-fw-localhost-6750" + } + ] +} +``` + +5. Produce messages to the source topic +```shell +$ for i in {0..10}; do bin/pulsar-client produce -m "key-$i" -n 1 test_cassandra; done +``` + +6. Check the sink status. It should show 11 messages are processed. + +```shell +$ bin/pulsar-admin functions getstatus --tenant public --namespace default --name cassandra-test-sink +{ + "functionStatusList": [ + { + "running": true, + "numProcessed": "11", + "numSuccessfullyProcessed": "11", + "lastInvocationTime": "1532031040117", + "instanceId": "0", + "metrics": { + "metrics": { + "__total_processed__": { + "count": 5.0, + "sum": 5.0, + "max": 5.0 + }, + "__total_successfully_processed__": { + "count": 5.0, + "sum": 5.0, + "max": 5.0 + }, + "__total_system_exceptions__": {}, + "__total_user_exceptions__": {}, + "__total_serialization_exceptions__": {}, + "__avg_latency_ms__": {} + } + }, + "workerId": "c-standalone-fw-localhost-6750" + } + ] +} +``` + +7. Check results in cassandra + +```shell +$ docker exec -ti cassandra cqlsh localhost +Connected to Test Cluster at localhost:9042. +[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4] +Use HELP for help. +cqlsh> use pulsar_test_keyspace; +cqlsh:pulsar_test_keyspace> select * from pulsar_test_table; + + key | col +--------+-------- + key-5 | key-5 + key-0 | key-0 + key-9 | key-9 + key-2 | key-2 + key-1 | key-1 + key-3 | key-3 + key-6 | key-6 + key-7 | key-7 + key-4 | key-4 + key-8 | key-8 + key-10 | key-10 + +(11 rows) +``` +8. Delete the Sink + +```shell +$ bin/pulsar-admin sink delete --tenant public --namespace default --name cassandra-test-sink +"Deleted successfully" +```