Skip to main content

Sending JSON -> NodeJS -> Kafka.

What we are trying to achieve ?
  1. Send json from and browser/curl to nodejs.
  2. nodejs will redirect json data to kafka.
  3. Further processing is done on kafka.
  4. We can then see the json arrive on kafka-console-consumer.sh script.

Step 1 : Create a script called json_nodejs_kafka.js with below script.

/*
    Getting some 'http' power
*/
var http=require('http');

/*
    Setting where we are expecting the request to arrive.
    http://localhost:8125/upload

*/
var request = {
        hostname: 'localhost',
        port: 8125,
        path: '/upload',
        method: 'GET'
};

/*
    Lets create a server to wait for request.
*/
http.createServer(function(request, response)
{
    /*
        Making sure we are waiting for a JSON.
    */
    response.writeHeader(200, {"Content-Type": "application/json"});

    /*
        request.on waiting for data to arrive.
    */
    request.on('data', function (chunk)
    {

        /* 
            CHUNK which we recive from the clients
            For out request we are assuming its going to be a JSON data.
            We print it here on the console. 
        */
        console.log(chunk.toString('utf8'))

        /* 
            Using kafka-node - really nice library
            create a producer and connect to a Zookeeper to send the payloads.
        */
        var kafka = require('kafka-node'),
        Producer = kafka.Producer,
        client = new kafka.Client('kafka:2181'),
        producer = new Producer(client);

        /*
            Creating a payload, which takes below information
            'topic'     -->    this is the topic we have created in kafka.
            'messages'     -->    data which needs to be sent to kafka. (JSON in our case)
            'partition' -->    which partition should we send the request to.
                            If there are multiple partition, then we optimize the code here,
                            so that we send request to different partitions. 

        */
            payloads = [
            { topic: 'test', messages: chunk.toString('utf8'), partition: 0 },
        ];

        /*
            producer 'on' ready to send payload to kafka.
        */
        producer.on('ready', function(){
            producer.send(payloads, function(err, data){
                    console.log(data)
            });
        });

        /*
            if we have some error.
        */
        producer.on('error', function(err){})

    });
    /*
        end of request
    */
    response.end();

/*
    Listen on port 8125
*/    
}).listen(8125);

Step 2 : Start above script on the nodejs server.

[nodejs-admin@nodejs nodejs]$ vim json_nodejs_kafka.js
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js

Step 3 : Execute curl command to send the JSON to nodejs.

[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload

Step 4 : Output on nodejs console

[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js 
{"username":"xyz","password":"xyz"}
{ test: { '0': 29 } }
{"username":"xyz","password":"xyz"} request from the curl command.
{ test: { '0': 29 } } response from the kafka cluster that, it has received the json.

Step5 : Output on the kafka consumer side.

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
{"username":"xyz","password":"xyz"}
{"username":"xyz","password":"xyz"} data received from nodejs server.

Comments

  1. Need to change line 47 of `json_nodejs_kafka.js` to

    client = new kafka.KafkaClient('kafka:2181'),

    for this to work with kafka-node 6.4.1

    ReplyDelete
  2. In 'json_nodejs_kafka.js' file, line 47 need to change

    client = new Client('kafka:2181'),

    ReplyDelete
  3. Thanks for every other excellent post. The place else may just anyone get that kind of info in such an ideal manner of writing? I’ve a presentation subsequent week, and I am at the look for such information. تحويل الى pdf

    ReplyDelete

Post a Comment

Popular posts from this blog

Cloudera Manager - Duplicate entry 'zookeeper' for key 'NAME'.

We had recently built a cluster using cloudera API’s and had all the services running on it with Kerberos enabled. Next we had a requirement to add another kafka cluster to our already exsisting cluster in cloudera manager. Since it is a quick task to get the zookeeper and kafka up and running. We decided to get this done using the cloudera manager instead of the API’s. But we faced the Duplicate entry 'zookeeper' for key 'NAME' issue as described in the bug below. https://issues.cloudera.org/browse/DISTRO-790 I have set up two clusters that share a Cloudera Manger. The first I set up with the API and created the services with capital letter names, e.g., ZOOKEEPER, HDFS, HIVE. Now, I add the second cluster using the Wizard. Add Cluster->Select Hosts->Distribute Parcels->Select base HDFS Cluster install On the next page i get SQL errros telling that the services i want to add already exist. I suspect that the check for existing service names does n

Zabbix History Table Clean Up

Zabbix history table gets really big, and if you are in a situation where you want to clean it up. Then we can do so, using the below steps. Stop zabbix server. Take table backup - just in case. Create a temporary table. Update the temporary table with data required, upto a specific date using epoch . Move old table to a different table name. Move updated (new temporary) table to original table which needs to be cleaned-up. Drop the old table. (Optional) Restart Zabbix Since this is not offical procedure, but it has worked for me so use it at your own risk. Here is another post which will help is reducing the size of history tables - http://zabbixzone.com/zabbix/history-and-trends/ Zabbix Version : Zabbix v2.4 Make sure MySql 5.1 is set with InnoDB as innodb_file_per_table=ON Step 1 Stop the Zabbix server sudo service zabbix-server stop Script. echo "------------------------------------------" echo " 1. Stopping Zabbix Server &quo

Access Filter in SSSD `ldap_access_filter` [SSSD Access denied / Permission denied ]

Access Filter Setup with SSSD ldap_access_filter (string) If using access_provider = ldap , this option is mandatory. It specifies an LDAP search filter criteria that must be met for the user to be granted access on this host. If access_provider = ldap and this option is not set, it will result in all users being denied access. Use access_provider = allow to change this default behaviour. Example: access_provider = ldap ldap_access_filter = memberOf=cn=allowed_user_groups,ou=Groups,dc=example,dc=com Prerequisites yum install sssd Single LDAP Group Under domain/default in /etc/sssd/sssd.conf add: access_provider = ldap ldap_access_filter = memberOf=cn=Group Name,ou=Groups,dc=example,dc=com Multiple LDAP Groups Under domain/default in /etc/sssd/sssd.conf add: access_provider = ldap ldap_access_filter = (|(memberOf=cn=System Adminstrators,ou=Groups,dc=example,dc=com)(memberOf=cn=Database Users,ou=Groups,dc=example,dc=com)) ldap_access_filter accepts standa