Skip to main content

NodeJS Kafka Producer - Using `kafka-node`

Now that we have Kafka and NodeJS ready. Lets some data to our Kafka Cluster.
Below is a basic producer code.
below are the Server Details.
  1. nodejs is the nodejs server.
  2. kafka is the kafka server (single node).

Step 1: Copy the below script in a file called producer_nodejs.js.

 /*
     Basic producer to send data to kafka from nodejs.
     More Information Here : https://www.npmjs.com/package/kafka-node
 */

 //    Using kafka-node - really nice library
 //    create a producer and connect to a Zookeeper to send the payloads.
 var kafka = require('kafka-node'),
     Producer = kafka.Producer,
     client = new kafka.Client('kafka:2181'),
     producer = new Producer(client);

     /*
         Creating a payload, which takes below information
         'topic'     -->    this is the topic we have created in kafka. (test)
         'messages'     -->    data which needs to be sent to kafka. (JSON in our case)
         'partition' -->    which partition should we send the request to. (default)

                         example command to create a topic in kafka: 
                         [kafka@kafka kafka]$ bin/kafka-topics.sh \
                                     --create --zookeeper localhost:2181 \
                                     --replication-factor 1 \
                                     --partitions 1 \
                                     --topic test

                         If there are multiple partition, then we optimize the code here,
                         so that we send request to different partitions. 

     */
     payloads = [
         { topic: 'test', messages: 'This is the First Message I am sending', partition: 0 },
     ];


 //    producer 'on' ready to send payload to kafka.
 producer.on('ready', function(){
     producer.send(payloads, function(err, data){
         console.log(data)
     });
 });

 producer.on('error', function(err){}

Step 2 : Start the kafka cluster as we already did in Installation of Kafka. Assuming topic as test

Step 3 : Start the consumer service as in the below command.

 [kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Step 4 : Execute below command. This will send This is the First Message I am sending Message to the Kafka consumer.

 [nodejs-admin@nodejs nodejs]$ node producer_nodejs.js

Step 5 : Check on the consumer you will see the message sent from nodejs.

 [kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
 This is a message
 This is another message here 
 This is the First Message I am sending

Comments

Popular posts from this blog

Zabbix History Table Clean Up

Zabbix history table gets really big, and if you are in a situation where you want to clean it up. Then we can do so, using the below steps. Stop zabbix server. Take table backup - just in case. Create a temporary table. Update the temporary table with data required, upto a specific date using epoch . Move old table to a different table name. Move updated (new temporary) table to original table which needs to be cleaned-up. Drop the old table. (Optional) Restart Zabbix Since this is not offical procedure, but it has worked for me so use it at your own risk. Here is another post which will help is reducing the size of history tables - http://zabbixzone.com/zabbix/history-and-trends/ Zabbix Version : Zabbix v2.4 Make sure MySql 5.1 is set with InnoDB as innodb_file_per_table=ON Step 1 Stop the Zabbix server sudo service zabbix-server stop Script. echo "------------------------------------------" echo " 1. Stopping Zabbix Server ...

Access Filter in SSSD `ldap_access_filter` [SSSD Access denied / Permission denied ]

Access Filter Setup with SSSD ldap_access_filter (string) If using access_provider = ldap , this option is mandatory. It specifies an LDAP search filter criteria that must be met for the user to be granted access on this host. If access_provider = ldap and this option is not set, it will result in all users being denied access. Use access_provider = allow to change this default behaviour. Example: access_provider = ldap ldap_access_filter = memberOf=cn=allowed_user_groups,ou=Groups,dc=example,dc=com Prerequisites yum install sssd Single LDAP Group Under domain/default in /etc/sssd/sssd.conf add: access_provider = ldap ldap_access_filter = memberOf=cn=Group Name,ou=Groups,dc=example,dc=com Multiple LDAP Groups Under domain/default in /etc/sssd/sssd.conf add: access_provider = ldap ldap_access_filter = (|(memberOf=cn=System Adminstrators,ou=Groups,dc=example,dc=com)(memberOf=cn=Database Users,ou=Groups,dc=example,dc=com)) ldap_access_filter accepts standa...

Installing Zabbix Version 2.4 Offline (Zabbix Server without Internet).

There might be situations where you have a remote/zabbix server which does not have internet connectivity, due to security or other reasons. So we create a custom repo on the remote/zabbix server so that we can install zabbix using rpms Here is how we are planning to do this. Download all the dependency rpms on a machine which has internet connection, using yum-downloadonly or repotrack . Transfer all the rpms to the remote server. Create a repo on the remote server. Update yum configuration. Install. NOTE: This method can be used to install any application, but here we have used zabbix as we had this requirement for a zabbix server. Download dependent rpms . On a machine which has internet connection install the package below. And download all the rpms . Make sure the system are similar (not required to be identical - At-least the OS should be of same version) mkdir /zabbix_rpms yum install yum-downloadonly Downloading all the rpms to location /zabbix_rpms/ ,...