27 Temmuz 2015 Pazartesi

Transactional Layer For Apache Cassandra

As all of you know, Apache Cassandra is the one of the most popular NoSQL databases. It is a time series database, succesful at scaling.

Last year in August, we were discussing swapping RDBMS database with a NoSQL database because RDBMS database can't scale enough for our customer requirements.

It is obvious that it is not that simple like swapping MySQL with Oracle (or vice versa) .You have to change a lot of things inside of your application.

The first thing , you have to change the data model.Besides you have to change transaction management of your application since there is no transactions in no sql (or there some lightweight transactions)

Hence, I have started to develop a transactional layer for Apache Cassandra.

I have to confess that transactions on NoSQL may not be adequate with NoSQL principles or it won't  resolve most of the problems while switching to NoSQL. It is an initial step to minimize the gap between enterprise world and NoSQL world.

It will be a simple layer;
  • Accepts CQL statements through a restful interface.
  • Keeps the transaction context again on the same cassandra database but in temporary tables.
  • Applies the changes after commit (or rollback)
 Let's start on a sample so I can explain the mechanism of cql-tx(name of transactional layer).

  • install cql-tx module
          npm install cql-tx 

  •  Go to cql-tx folder under node_modules folder update config.js file for logging path and cassandra configurations. (You may want to  change ports of rest and websocket interfaces)

var config = {
        restPort : 8080,
        websocketPort : 8081,
        trxClearanceIntervalInMins : 10,
        keyspace : 'mykeyspace',
        cassandraClient : {contactPoints: [''], keyspace: 'mykeyspace'},
        timezone : '+0300',
        winstonTransports : function(winston){
                            return [
                              new winston.transports.File({
                                  level: 'debug',
                                  filename: '/data/cqltx/logs/all-logs.log',
                                  handleExceptions: true,
                                  json: true,
                                  maxsize: 5242880, //5MB 
                                  maxFiles: 5,
                                  colorize: false
                              new winston.transports.Console({
                                  level: 'debug',
                                  handleExceptions: true,
                                  json: false,
                                  colorize: true

  •  cql-tx uses some tables to keep transactions and table metadata so you need to execute init.js in order to create these metadata tables.
              node init.js

  •  You are ready to start cql-tx ;
              node start cql-tx

  • Now for a scenario , let's create a table called users;

        create table users(fname text,lname text,user_id bigint,primary key (user_id));
  • Open a transaction with the help of following curl command ;
curl -H "Content-Type: application/json" -X POST -d '{"commandType" : "openTransaction" }' http://localhost:8080/cqltx
  • Execute an insert statement with the help of following curl command (change the transaction id with the value that's returned by open-transaction) ;             
curl -H "Content-Type: application/json" -X POST -d '{"commandType" : "execute", "txId" : "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx" ,"cql":"insert into users(user_id) values(64222)" }' http://localhost:8080/cqltx

  • At this point check  your users table, and check tx_users table;
              select * from users;
              select * from tx_users;

          You will see that your entry doesn't exist users while it exist on tx_users.
  • Commit the change (again change the transaction id);
curl -H "Content-Type: application/json" -X POST -d '{"commandType" : "commitTransaction", "txId" : "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx" }' http://localhost:8080/cqltx

  • Again check your users table, and check tx_users table;
              select * from users;
              select * from tx_users;

          You will see that your entry doesn't exist tx_users while it exist on users. The change is commitied.
  • Before commit , if you want to rollback (again change the transaction id);
curl -H "Content-Type: application/json" -X POST -d '{"commandType" : "rollbackTransaction", "txId" : "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx" }' http://localhost:8080/cqltx
  • If a transaction is not finalized in time period (defined as trxClearanceInterval in config.js), it will be automatically rolled back. 

Java Driver for cql-tx

You can use cqltx-rest-driver for java platforms.It communicates with cql-tx over restful interface.(download)

Usage for users table:

CqlSessionFactory factory = new CqlSessionFactory(new URL("http://localhost:8080/cqltx"));
CqlSession session = factory.createNewSession();
session.execute("insert into users(user_id,fname,lname) values(44433,'test1','test2')");

cql-tx has many bugs and open points , I will be developing it. I am looking forward to your feedbacks....

11 Temmuz 2015 Cumartesi

Spark on Solaris - Codec Problem

In my project, I need to run Apache Spark on Solaris like Hadoop :) (in my previous blog entry)

While executing my application (even sample of Apache spark), I have faced the following error ;

Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.spark.io.SnappyCompressionCodec.<init>(CompressionCodec.scala:152)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:977)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:974)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:974)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:799)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:797)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:797)
at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:184)
at com.ericsson.experimental.spark.jobs.Collaborative.collabFilter(Collaborative.java:162)
at com.ericsson.experimental.spark.jobs.Collaborative.main(Collaborative.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Its root cause is not having Snappy codec on solaris. It is possible to install Snappy codec by following the instructions.

But I couldn't make it working ... So I deciced to switch LZ4 codec by the following parameter.

/usr/local/spark-1.4.0/bin/./spark-submit --conf spark.io.compression.codec=lz4 --class com.sample.experimental.spark.jobs.CollaborativeSample ~/spark-sample-0.0.1-SNAPSHOT.jar hdfs://sparksrv1:19000/in/data

10 Temmuz 2015 Cuma

Hadoop on Solaris - Namenode Problem

For a project we need to install Hadoop on Solaris servers.

After installation we have faced with the following error in the namenode log;

2015-07-06 14:11:14,834 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Number of transactions: 2 Total time for transactions(ms): 2 Number of transactions batched in Syncs: 0 Number of syncs: 3 SyncTimes(ms): 576
2015-07-06 14:11:14,843 INFO org.apache.hadoop.hdfs.server.namenode.FileJournalManager: Finalizing edits file /data/hdfs_name/current/edits_inprogress_0000000000000000005 -> /data/hdfs_name/current/edits_0000000000000000005-0000000000000000006
2015-07-06 14:11:14,851 INFO org.apache.hadoop.ipc.Server: Stopping server on 19000
2015-07-06 14:11:14,937 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Stopping services started for active state
2015-07-06 14:11:14,937 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Stopping services started for standby state
2015-07-06 14:11:14,953 INFO org.mortbay.log: Stopped HttpServer2$SelectChannelConnectorWithSafeStartup@
2015-07-06 14:11:14,965 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping NameNode metrics system...
2015-07-06 14:11:14,968 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: NameNode metrics system stopped.
2015-07-06 14:11:14,969 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: NameNode metrics system shutdown complete.
2015-07-06 14:11:15,005 FATAL org.apache.hadoop.hdfs.server.namenode.NameNode: Failed to start namenode.
ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.fs.DF.getFilesystem(DF.java:76)
at org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker$CheckedVolume.<init>(NameNodeResourceChecker.java:69)
at org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker.addDirToCheck(NameNodeResourceChecker.java:165)
at org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker.<init>(NameNodeResourceChecker.java:134)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startCommonServices(FSNamesystem.java:1103)
at org.apache.hadoop.hdfs.server.namenode.NameNode.startCommonServices(NameNode.java:629)
at org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:615)
at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:762)
at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:746)
at org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1438)
at org.apache.hadoop.hdfs.server.namenode.NameNode.main(NameNode.java:1504)
2015-07-06 14:11:15,018 INFO org.apache.hadoop.util.ExitUtil: Exiting with status 1
As you can see, it is not a clear error message. When we investigate the root cause , we have found out that it can't execute the following command ;
df -k -P XXXXX

In solaris -P parameter is not supported so I have done a small patch for it in hadoop-common library.It is attached.

My colleagues solution is better he has the linked the default df command to "/usr/xpg4/bin/df" which supports -P  command. Thanks to Can Sevilmis.

3 Temmuz 2015 Cuma

Amazon S3 access from Openshift Application

Openshift is DevOps platform of Red Hat. My personal idea about Openshift is that it is simple and will provide you more efficiency (for especially startups)

Openshift does't provide you a storage like S3 and in my project I need to access Amazon S3 storage from my Openshift application.

  • I have an openshift application that is placed at Amazon US-East.
  • I have S3 buckets under my Amazon account.

Steps, you have to apply ;
(Assumption : You have already installed rhc client for openshift.)
  • Define a Group for S3 access on Amazon console.(group name : S3FullAccess)
    • Go to Security Credentials part.On the right you will see the groups.
    • Attach  the AmazonS3FullAccess policy to the group.
      • If you don't give access you will be facing the following error 
2015-07-03 07:28:06,231 WARN  [com.amazonaws.services.s3.AmazonS3Client] (default task-1) No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.
2015-07-03 07:28:10,542 INFO  [stdout] (default task-1) Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
2015-07-03 07:28:10,543 INFO  [stdout] (default task-1) Error Message:    The request signature we calculated does not match the signature you provided. Check your key and signing method. (Service: Amazon S3; Status Code: 403; Error Code: SignatureDoesNotMatch; Request ID: B424D1235CF7B2C4)
2015-07-03 07:28:10,543 INFO  [stdout] (default task-1) HTTP Status Code: 403
2015-07-03 07:28:10,544 INFO  [stdout] (default task-1) AWS Error Code:   SignatureDoesNotMatch
2015-07-03 07:28:10,544 INFO  [stdout] (default task-1) Error Type:       Client
2015-07-03 07:28:10,545 INFO  [stdout] (default task-1) Request ID:       B424D1235CF7B2C4

  • Define a user and add it to S3FullAccess group.
  • Go to recently created user and generate AWS_ACCESS_KEY and AWS_SECRET_ACCESS_KEY under security credentials part.
  • Set the environment variables for AWS_ACCESS_KEY and AWS_SECRET_ACCESS_KEY
    • rhc env-set AWS_ACCESS_KEY_ID='AKIAIXXXXXXHCK2Q' -a maurice
    • rhc env-set AWS_SECRET_ACCESS_KEY='ETPvP3JKtXXXXXXXmsn91l+o17QJt' -a maurice
  •  Add the following dependency for Amazon S3 objects.

  • Write a servlet that will accept the posted data and write to S3 as a file.
package com.example.web.servlet;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;

 * Servlet implementation class S3EventListener
public class S3EventListener extends HttpServlet {
    private static final long serialVersionUID = 1L;
     * @see HttpServlet#HttpServlet()
    public S3EventListener() {
        // TODO Auto-generated constructor stub

     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response)
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        response.getWriter().append("Get Method is not supported.");

     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse response)
    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        AmazonS3 s3client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
        try {
            System.out.println("Uploading a new object to S3 from a post\n");
            BufferedReader reader = request.getReader();
            ByteArrayInputStream baiStream = new ByteArrayInputStream(reader.readLine().getBytes());
            s3client.putObject(new PutObjectRequest(
                                     "sample-bucket", "sample-file", baiStream,new ObjectMetadata() ));

         } catch (AmazonServiceException ase) {
            System.out.println("Caught an AmazonServiceException, which " +
                    "means your request made it " +
                    "to Amazon S3, but was rejected with an error response" +
                    " for some reason.");
            System.out.println("Error Message:    " + ase.getMessage());
            System.out.println("HTTP Status Code: " + ase.getStatusCode());
            System.out.println("AWS Error Code:   " + ase.getErrorCode());
            System.out.println("Error Type:       " + ase.getErrorType());
            System.out.println("Request ID:       " + ase.getRequestId());
        } catch (AmazonClientException ace) {
            System.out.println("Caught an AmazonClientException, which " +
                    "means the client encountered " +
                    "an internal error while trying to " +
                    "communicate with S3, " +
                    "such as not being able to access the network.");
            System.out.println("Error Message: " + ace.getMessage());
        doGet(request, response);