Use multiple collections with MongoDB Kafka Connector

According with the documentation if you don't provide a value it will read from all collections

"name of the collection in the database to watch for changes. If not set then all collections will be watched."

I saw the connector source code and I confirmed this:

https://github.com/mongodb/mongo-kafka/blob/k133/src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java#L462

However if the collection is not provided I got an error like this:

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 73 (InvalidNamespace): '{aggregate: 1} is not valid for '$changeStream'; a collection is required.' on server localhost:27018. The full response is {"operationTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "ok": 0.0, "errmsg": "{aggregate: 1} is not valid for '$changeStream'; a collection is required.", "code": 73, "codeName": "InvalidNamespace", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "signature": {"hash": {"$binary": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "$type": "00"}, "keyId": {"$numberLong": "0"}}}}

This is my configuration file

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017,localhost:27018/order
database=order
collection=

topic.prefix=redemption
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=
copy.existing=true
errors.tolerance=all

If a collection is used, I'm able to use the connector and generate topics.

Seeing the logs it appears the connector is connecting to the db:

INFO Watching for database changes on 'order' (com.mongodb.kafka.connect.source.MongoSourceTask:620)

Source Code

else if (collection.isEmpty()) {
      LOGGER.info("Watching for database changes on '{}'", database);
      MongoDatabase db = mongoClient.getDatabase(database);
      changeStream = pipeline.map(db::watch).orElse(db.watch());
    } else

If I go to my mongo console, I'm having the following:

rs0:SECONDARY> db.watch()
2020-10-28T18:13:50.344-0600 E QUERY    [thread1] TypeError: db.watch is not a function :
@(shell):1:1
rs0:SECONDARY> db.watch
test.watch


from Recent Questions - Stack Overflow https://ift.tt/35MGGy4
https://ift.tt/eA8V8J

Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation