r. alexander milowski, geek

My mood...Welcome to the web home of Alex Milowski. Here you'll find information about me, some of the software I've written, and the projects in which I participate. You'll find variety of Mathematics, technology, papers, and presentations on this website written or contributed to by me. If you have an questions or comments, please don't hesitate to contact me.

Recent Entries

Experiments with Big Weather Data in MarkLogic - Doomed Approach

The Naive Approach of just importing the weather reports verbatim works if all you want to do is enumerate a particular weather report's data by segments of time.  That is, this expression works really well:

/s:aprs/s:report[@from='DW8568' and @received>$start and @received<$end]

If you look at the results xdmp:plan, you'll see the query plan hits all the indices. I've compared that simple expression with more complicated expressions that use cts:search and the results are effectively the same.  That's good as that means the query optimizer is doing its job.

Unfortunately, there are other queries where this approach fails to perform.  Before I get into that, I need to say what I mean by performance.  My goal is to produce small segments of data as web resources that match certain criteria (e.g. weather reports for the last hour within a geospatial region).  Those resources need to be produced in a reasonably short and constant amount of time; reasonable short means a few seconds--but less is always better.  To scope this a big more, if you ask for too large of a time period or too large of a geospatial region, then you've made an unreasonable request.  Further, as the number of reports increases over time, I want the query time to be constant.

The first query that is problematic is the list of station identifiers.  If you were using regular XQuery, you'd try something like this:

for $id in distinct-values(/s:aprs/s:report/@from)
   return element station { $id }

Unfortunately, even though there is an index on s:report/@from, it isn't used in that expression and the performance is terrible.

It is easy to fix that problem by using cts:element-attribute-values(), but what you really want to do is summarize each station something like this:

for $id in cts:element-attribute-values( xs:QName("s:report"), xs:QName("from"),"","item-frequency")
    let $last := (for $r in /s:aprs/s:report[@from=$id] order by $r/@received return $r)[1],
        $location := (for $r in /s:aprs/s:report[@from=$id][@latitude and @longitude] order by $r/@received return $r)[1]
    return element s:station {
       attribute call { $id },
       attribute count {cts:frequency($id)},
       attribute last-received {$last/@received},
       if ($location)
       then ($location/@latitude, $location/@longitude)
       else ()
    }

where we retrieve the last weather report received from the station.  This query doesn't perform very well at all.  As the number of reports in the database increases, searching for the last report for each station identifier just isn't efficient as specified.  From this you can start to see how the Naive Approach starts to fall apart.

The second problem relates to geospatial queries and suffers from the same issues as the number of reports increases.  Since there is a geospatial index on the s:report/@latitude and s:report/@longitude attribute pair, we'd like to be able to search within a certain geospatial region for recent reports.  That query looks something like this:

let $now := current-dateTime(), $start := $now - xs:dayTimeDuration("PT1H")
   for $r in 
      cts:search(/s:aprs/s:report[@received > $start], 
        cts:element-attribute-pair-geospatial-query(xs:QName("s:report"), 
          xs:QName("latitude"), xs:QName("longitude"), cts:circle(5, cts:point(37,-122))))
      order by $r/@received
      return $r

Again, the problem is the location is buried with a large pile of s:report elements.  MarkLogic does produce the query results but not within the performance metrics I'd prefer.  Keep in mind that this is still impressive for the shove it in a go (Naive) approach: it takes 10's of seconds for 30+ million reports.  The problem is that it just isn't setup to perform and the query times increase as the number of reports increases.

As structured, the data is a classic example of denormalized data.  The location and other station summary information is repeated over and over again on the s:report elements.  Even though there are indices setup @from, @latitude, @longitude, and @received, it won't help over time.  The number of s:report elements will just be overwhelming as we're trying to extract a unique view of the normalized data (i.e. the latest station summary).

The conclusion is that the Naive Approach is doomed for big data.  That's not a huge surprise.  I expected it to fail and I wanted to see where.

MarkLogic is still a database system and, as I've been told, even for them, there isn't a free lunch.  You have to organize your information into rational collections; one giant collection to hold them all just won't scale.  As such, the solution is quite simple: more collections containing different views of the raw data.

As I'm local to MarkLogic, I took this problem as an opportunity to visit the wonderfully helpful folks at MarkLogic.  While I could have made up my own solutions, I wanted to understand what they would recommend.  It was a clean slate and I wasn't committed to any design.  I presented my current work literally as the Naive Approach and talked a bit about my problems.

One recommendation was, for the above queries, to keep a separate collection with up-to-date station summaries of each station.  While the database contains many millions of weather reports that will grow significantly, there are only about 10,000 stations right now.  The number of stations is unlikely to grow very fast in comparison to the number of reports.  As the weather reports are imported, keep a separate set of station summary documents, in a separate collection, for each of the 10,000 stations, and these problems should go away.  The problems did go away and the query times became constant and fast.  So, the solution was to normalize your data!

In relational databases, you often denormalize your data to improve performance.  In this case, the solution was to actually normalize your data.  That isn't a ground shaking revelation.  You wouldn't want only one large table of reports in a relational database either.  The difference is that you can get away with a lot within MarkLogic without having to think about information organization.  That is, until things go wrong.

The nice bit about this change is that in the cts:search queries, only the target nodes have to change:

collection("http://weather.milowski.com/stations/")//s:position

and the enumeration of stations becomes a document enumeration:

collection("http://weather.milowski.com/stations/")/s:station/@id

Best of all, this was an easy change to implement.  Why?  I'm using XProc and all I had to do was make a simple change to my import pipeline.  I'll post that bit next.  

Experiments with Big Weather Data in MarkLogic - The Naive Approach

I've heard over-and-over that MarkLogic is a fantastic XML database--you just import your documents and query away!  Given the quality of the people that I personally know at MarkLogic, I'm sure that's true.  Still, I wanted to put that to the test.  Every database system has techniques for getting reasonable or blindingly fast performance and I wanted to see how that works and at what cost.

The process that receives the APRS weather reports from the CWOP produces data in five minute segments stored in separate documents and from each of the three APRS-IS servers.  Each document produced is between 130K and 430K in size and contains between 450 to 1600 weather reports.  Each hour, 36 of those documents are produced.  On average, about 17MB per hour, or 12GB per month, of data is produced.

The documents produced have the following structure:

<aprs xmlns="http://weather.milowski.com/V/APRS/" source="cwop.tssg.org" start="2012-04-11T16:03:29Z">
<report from="CW1367" type="weather" latitude="40.7" longitude="-74.2" 
received="2012-04-11T16:03:29Z" at="2012-04-13T17:12:00Z" 
wind-dir="340" wind-speed="9" wind-gust="14" 
temperature="53" rain-hour="0" rain-midnight="0" humidity="40" pressure="10090" />
...
</aprs>

The naive approach is to just import these documents verbatim into one large collection and setup a few basic indices:

  • a xs:string index on s:report/@from,
  • a xs:dateTime index on s:report/@received,
  • a geospatial index on the attribute pair s:report/@longitude and s:report/@latitude.

My tool of choice for importing these documents is XProc.  The pipeline is rather simple:

<p:declare-step xmlns:p="http://www.w3.org/ns/xproc"
                xmlns:c="http://www.w3.org/ns/xproc-step"
                xmlns:ml="http://xmlcalabash.com/ns/extensions/marklogic"
                xmlns:s="http://weather.milowski.com/V/APRS/"
                version="1.0"
                name="insert-weather">
<p:option name='xdb.user'/>
<p:option name='xdb.password'/>
<p:option name='xdb.host'/>
<p:option name='xdb.port'/>
<p:input port="source"/>

<p:import href="http://xmlcalabash.com/extension/steps/library-1.0.xpl"/>

<p:delete match="/s:aprs/s:report[@type='encoded']"/>

<p:delete match="/s:aprs/s:report[@error]"/>

<ml:insert-document name="insert" collections="http://weather.milowski.com/weather/">
   <p:with-option name='user' select='$xdb.user'/>
   <p:with-option name='password' select='$xdb.password'/>
   <p:with-option name='host' select='$xdb.host'/>
   <p:with-option name='port' select='$xdb.port'/>
   <p:with-option name="uri" 
                  select="concat('http://weather.milowski.com/',/s:report/@source,'-',/s:report/@start,'.xml')"/>
</ml:insert-document>
</p:declare-step>

You'll notice that I decided to remove errors and encoded data from the original source.  Errors are reports I couldn't parse according the APRS rules and encoded data are non-standard encodings of weather data.  These turn out to be very small percentages of the actual received data.  That is, almost everyone is producing data correctly by the standardize set of rules.

To apply that pipeline over and over again to the documents, I wrote a little daemon process that uses Calabash's API to run the pipeline.  The daemon waits for the files to show up in a particular directory, applies the pipeline to documents it discovers, and then moves them to an archive directory. That allows me to re-process them in the future if I change my mind--which I did.

A bit of foreshadowing never hurts.  I'll post more on the results of this later.  I've got some queries to write.

Experiments with Big Weather Data in MarkLogic - Introduction

Over the past couple months, I've been experimenting with big data on the web for scientific purposes. The goal is to take my research on geospatial scientific data on the web and use MarkLogic to create a repository for large sensor data. My current scientific area of focus is weather data (sensor data in general) that I'm collecting through the Citizen Weather Observation Program (CWOP).

The data comes to me over the Internet via APRS-IS, which is a home-grown peer-to-peer message relaying system.  The messages originate from both Internet and radio-based systems. Typically, the sending systems are weather stations or location trackers, but they could be just about anything. The messages are cryptic character-based packets encoding data.

All you really want to know is that:

DW7508>APRS,TCPXX*,qAX,CWOP-2:@140818z5303.00N/00414.20W_290/011g018t040r000P000h77b09878eCumulusDsVP
DW4331>APRS,TCPXX*,qAX,CWOP-2:@140818z3244.80N/11708.33W_137/000g...t046r...p...P000h96b10083.DsVP
DW4314>APRS,TCPXX*,qAX,CWOP-2:@140818z3835.73N/12057.58W_072/000g001t040r000p000P000h71b10214.DsVP
KF9X>APRS,TCPXX*,qAX,CWOP-2:@140819z4420.99N/08952.58W_080/001g004t034r001P001p001h91b10208v6

gets turned into XML:

<report from="dw3512" type="weather" latitude="44.903" longitude="-85.06833" 
received="2012-04-10T08:00:00Z" at="2012-04-10T08:00:00 Z" wind-dir="303" wind-speed="6" wind-gust="13"
temperature="33" rain-hour="0" rain-midnight="0" humidity="81" pressure="10098" />
<report from="DW6820" type="weather" latitude="54.37533" longitude="2.89533"
received="2012-04-10T08:00:00Z" at="2012-04-10T08:00:00 Z" wind-dir="272" wind-speed="9" wind-gust="19"
temperature="43" rain-hour="1" rain-24hours="56" rain-midnight="17" humidity="78" pr essure="9662" />
<report from="DW2039" type="weather" latitude="51.24533" longitude="-2.94883"
received="2012-04-10T08:00:00Z" at="2012-04-10T07:59:0 0Z" wind-dir="237" wind-speed="6" wind-gust="12"
temperature="44" rain-hour="0" rain-24hours="43" rain-midnight="1" humidity="88" pr essure="9898" />

Note: there is no correspondence for the above examples, so don't try to parse the APRS messages to produce my XML.

Through the CWOP APRS-IS servers, somewhere around 55,000+ weather reports per hour are aggregated and available to be received.  I setup a process to receive these messages, turn them into XML, and dump them onto disk as XML documents in 5 minute segments.  I have that running now and I expect it to generate about 8-12 GB of raw XML data a month.

The goal is to load this data into MarkLogic, understand how to store such data, and expose the data on the web as a useful archive of sensor data.

[More entries ...]