Saturday, September 10, 2016

[Solved] Fixing external network access in Hortonworks Sandbox HDP 2.4 - DNS NAT on VirtualBox

Hi,
just in case this is of help to anyone.

I installed HDP Sandbox 2.4 for Virtualbox locally in my computer. It is under a company LAN.

All HDP services worked fine. However, I had a problem getting internet connection (e.g. in order to install packages, or compile projects with dependencies to download). It turned out there was an issue resolving names.

[root@sandbox tests]# ping google.com
ping: unknown host google.com
 
[root@sandbox tests]# ping 216.58.201.142  
PING 216.58.201.142 (216.58.201.14256(84) bytes of data.
64 bytes from 216.58.201.142: icmp_seq=1 ttl=53 time=15.4 ms
64 bytes from 216.58.201.142: icmp_seq=2 ttl=53 time=12.4 ms
...

being 216.58.201.142 the IP from google.com, resolved from my local machine.

HDP comes in its file /etc/resolv.conf with an entry pointing to the famous public Google DNS, 8.8.8.8.
It happens this is not reachable from my network, for some reason I still don't know.
What you can do, is to force HDP to use the IP of your LAN's DNS server.

You can get that by a simple nslookup from your local machine.

$ nslookup.exe google.com
Non-authoritative answer:
Server:  your.nameserver.com
Address:  x.y.z.a
 
 
Name:    google.com
Addresses:  (google IPs)

What you need is your nameserver's address, x.y.z.a. Add it to your HDP Sandbox VM /etc/resolv.conf and you should be all set.

From there you can test it by e.g. pinging

Monday, September 5, 2016

Testing Alluxio

Lately I am testing Alluxio, formerly Tachyon, a tool that I will be using for keeping in-memory a large bunch of objects that may have a limited number of ad-hoc, asynchronous, daily modifications, for (mostly) Spark processing over HDFS.

Looking at feedback from other companies (the quite hyped Baidu and Barclays examples, among others), it seems like Alluxio is a good fit for this problem. Specifically from Barclays', since their architecture sounds like a common pattern out there and similar to one that actually I've met.

However, there are some others contenders too, and I've been impressed by Apache Ignite, a whole suite for all things grid. It has an astonishing set of features, but especially the IgniteRDD (Spark Shared RDD) has caught my attention. Several modes of operation... very interesting. This would make for another testing effort, and especially so because its architecture (and use case set) seems to differ largely from that of Alluxio.

Also, maybe more general-purpose tools like Redis would be OK to make the work. Alluxio's integration with HDFS made it the first point of contact for attacking the problem for us, but certainly it does not mean it has to be the best case. A recent article on the benefits of using Spark with Redis for time series computation reported to accelerate Spark over 100 times, and Spark-with-Alluxio over 45 times (which also means that Spark with Alluxio only would get about 100/45 = around twice as fast as Spark alone... which sounds too little a number).

Let's see how it goes...

Tuesday, September 29, 2015

Benefits of Functional Programming in Big Data

In a recent talk about MongoDB, I saw that some of the attendants were not used to the stream-like kinda-functional pipeline programming. 

I explained briefly the origins of functional programming and some reasons why it is in fashion lately even in programming languages that don't have a functional nature - like Java itself. But I neglected to give some of the reasons why this is a stable trend.

So, I'd like to point out some benefits of functional programming. I won't overthink: I took them from The Book.

Purely functional functions (or expressions) have no side effects (memory or I/O). This means that pure functions have several useful properties, many of which can be used to optimize the code:
  • If the result of a pure expression is not used, it can be removed without affecting other expressions.
  • If a pure function is called with arguments that cause no side-effects, the result is constant with respect to that argument list (sometimes called referential transparency), i.e. if the pure function is again called with the same arguments, the same result will be returned (this can enable caching optimizations such as memoization).
  • If there is no data dependency between two pure expressions, then their order can be reversed, or they can be performed in parallel and they cannot interfere with one another (in other terms, the evaluation of any pure expression is thread-safe).
  • If the entire language does not allow side-effects, then any evaluation strategy can be used; this gives the compiler freedom to reorder or combine the evaluation of expressions in a program (for example, using deforestation).

 Thread-safety is a key concept for scalability: it ensures you can parallelize happily without having to worry about side effects. Scala is a good example of a language thought for this.
Another interesting snippet for Java programmers:

In Javaanonymous classes can sometimes be used to simulate closures;[48] however, anonymous classes are not always proper replacements to closures because they have more limited capabilities.[49] Java 8 supports lambda expressions as a replacement for some anonymous classes.[50] However, the presence of checked exceptions in Java can make functional programming inconvenient, because it can be necessary to catch checked exceptions and then rethrow them—a problem that does not occur in other JVM languages that do not have checked exceptions, such as Scala.

Wednesday, August 19, 2015

RocksDB: how architecture changes with hardware changes (Flash memory)


RocksDB is an embedded database from Facebook oriented to take advantage of Flash storage. It's a good example of how software architecture needs to change to cater for new hardware technologies.

Dhruba Borthakur explains it better:

Why do we need an Embedded Database?
Flash is fundamentally different from spinning storage in performance. For the purpose of this discussion, let's assume that a read or write to spinning disk takes about 10 milliseconds while a read or write to flash storage takes about 100 microseconds.  Network network-latency between two machines remains around 50 microseconds. These numbers are not cast in stone and your hardware could be very different from this one, but these numbers demonstrate the relative differences between two scenarios. What does this have anything to do with application-systems architecture? A client wants to store and access data from a database. There are two alternatives, it can store data on locally attached disks or it can store data over the network on a remote server that have disks attached to it. If we consider latency, then the locally attached disks can serve a read request in about 10 milliseconds. And in the client-server architecture, accessing the same data over a network results in a latency of 10.05 milliseconds, the overhead imposed by the network being only a miniscule 0.5%.  Given this fact, it is easy to understand why a majority of currently-deployed systems use the client-server model of accessing data. (For the purpose of the discussion, I am ignoring network bandwidth limitations).

Now, lets consider the same scenario but with disks replaced by flash drives. A data access in the case of locally attached flash storage is 100 microseconds whereas accessing the same data via the network is 150 micros. Network data access is 50% higher overhead than local data access and 50% is a pretty big number. This means that databases that run embedded within an application could have much lower latency than applications that access data over a network. Thus, the necessity of an Embedded Database.

More in the History of RocksDB post (curiously, a one-post blog).

Sunday, April 27, 2014

Storm-YARN at Yahoo! - Realtime streaming data processing over HDFS at Hadoop Summit

In Hadoop Summit last year, Andrew Feng of Yahoo! delivered a presentation on Storm-on-YARN, using as a use case the personalization they use in their web pages.

Apache Storm is a streaming data processing aimed at achieving real-time processing over big data. It is fault-tolerant (if any node fails, the task will be respawned in other nodes). We have previously talked about Lambda Architectures recently (and talking about Lambdoop at the same time), and about its seminal article by Nathan Marz. Storm is actually led by nathanmarz, and it proposes to form a topology of tasks separated in Spouts(sources of data) and Bolts (processors with inputs and outputs) in a DAG, which reminds me that of Apache Tez.

I've found the Storm explanation at Hortonworks page quite concise, concrete and to the point - so I'll just leave it there if you want to know more of Storm. I also recommend to browse the concepts at Storm home page, now at Apache Incubator.



I actually find interesting in the presentation, the explanation on how does Storm work internally by way of showing how storm-yarn makes your life easier.

PS. They actually tell everyone that Yahoo! had been using (by July 2013) YARN in a 10.000 nodes cluster. For those that doubt about stability and production-readiness of the system that brings that extra flex to HDFS.

PS2. All due respect, I find Mr. Feng's accent a bit hard to follow, but gotta say that as the presentation advanced I began to like it :)

Thursday, April 24, 2014

Hadoop number of reducers rule of thumb explained in map reduce

Reviewing my documentation for my CCDH exam in a few weeks, I have encountered the topic of the recommended number of reducers.
There is a rule of thumb widely known: 0.95 or 1.75 times (#nodes * mapred.tasktracker.tasks.maximum).

With 0.95 factor, all reduce tasks are run at the same time, but each one takes typically longer. With 1.75 factor, each reduce task finishes faster, but some reduce tasks are waiting in a queue to be executed. Prefer the 1.75 factor for better load balancing.

From Cloudera documentation:

Typical way to determine how many Reducers to specify:
– Test the job with a relatively small test data set
– Extrapolate to calculate the amount of intermediate data expected from the 'real' input data.
– Use that to calculate the number of Reducers which should be specified.

However this doesn't adjust to all, or most problems and/or environments. Looking for a deeper explanation and hints from industry leaders, I found in this interesting conversation in Quora, the following advice from Allen Wittenauer from Linkedin:

 I tend to tell users that their ideal reducers should be the optimal value that gets them closest to:

- A multiple of the block size
- A task time between 5 and 15 minutes
- Creates the fewest files possible

Anything other than that means there is a good chance your reducers are less than great.  There is a tremendous tendency for users to use a REALLY high value ("More parallelism means faster!") or a REALLY low value ("I don't want to blow my namespace quota!").  Both are equally dangerous, resulting in one or more of:

- Terrible performance on the next phase of the workflow
- Terrible performance due to the shuffle
- Terrible overall performance because you've overloaded the namenode with objects that are ultimately useless
- Destroying disk IO for no really sane reason
- Lots of network transfers due to dealing with crazy amounts of CFIF/MFIF work

Now, there are always exceptions and special cases. One particular special case is that if following that advice makes the next step in the workflow do ridiculous things, then we need to likely 'be an exception' in the above general rules of thumb.
[...]
One other thing to mention:  if you are changing the reducer count just to make the partitioning non-skewy in order to influence the partition, you're just plain "doing it wrong".

There is another issue, that is being in a shared cluster. If you use a FairScheduler, you can be affected by other jobs' and users' quotas. You'd need to adjust your "#nodes" value according to it.

More insight from Pere Ferrera from Datasalt:

- # reducers not too few because if they are very big and one fails, retrying it will take much more time than retrying a smaller one.
- # reducers is not generally the problem. If there is one, it is partitioning. It doesn't matter whether you use 100 reducers if only 1 is going to handle all the work.

Two good points. I like the second: partitioning the data correctly is way more important than the number of "slots" where you are sending the data to be reduced. 

Tuesday, April 15, 2014

What is Apache Tez? A Hadoop YARN DAG-based framework for speed and flexibility

A very nice presentation on Apache Tez by Bikas Saha, Hadoop and Tez committer currently at Hortonworks, at QCon 2013 in NY. The slides are quite dense, but illuminating about what Tez (and YARN) can offer to the Hadoop ecosystem.

Apache Tez: Accelerating Hadoop Query Processing:
http://www.infoq.com/presentations/apache-tez

Tez is an Apache project that aims to accelerate Hadoop querying using YARN's possibilities. It is a client-side application (more on this later). But it is not a end-user application: it is thought to build applications on top of Tez.

CHARACTERISTICS

Expressive dataflow definition APIs.

It uses sampling to better partitioning data.

Flexible Input-Output-Processor runtime model

The idea is to have a library of inputs, outputs and processors that can be plugged into one another dinamically to perform the computations.
For example, you can define a mapper as a sequence like this:
INPUT: HDFSInput
PROCESSOR: MapProcessor
OUTPUT: FileSortedOutput
or a reducer as:
INPUT: ShuffleInput
PROCESSOR: ReduceProcessor
OUTPUT: HDFSOutput
This is an expression of what actually happens behind the scenes in MRv1, but made flexible to accomodate other tasks.

Data type agnostic

Nothing enforced. Just bytes. KV, tuples, whatever; it's up to your application.

Simplifying Deployment

Upload the libraries and use Tez client, nothing installed.

 EXECUTION PERFORMANCE

Performance gains over MapReduce

Due to the fact that this is a DAG: removed write barrier between computations and launch overhead, etc.

Plan reconfiguration at runtime

We can change the topology of the graph in runtime! This means, we can choose different topologies depending on e.g. the volume of data, you change the number of reducers. Or make joins in different orders. It really gives a great deal of flex!

Optimal resource management

By reusing YARN containers, for new tasks or to keep shared objects in memory for further task executions.

Dynamic physical flow decisions

Optimizations like "process locally if possible", "can we use in-memory datastore?", etc. This is in the pipeline, but it does open possibilities.