Creating Jobs

Jobs are created by creating dy.yaml files in the jobs folder. Each job is composed of several steps that activate blocks. A block defines the business logic of an action. For example, a block can write to a Kafka stream, can read from a cloud API, can transform structure, or enrich a message with external data. A steps activates the block with a set of parameters.

Overview of the Job YAML Structure

Each Job must start with a block that either produces data or accepts data from external sources. The subsequent blocks each receive the output of the previous step as an input. The data will be streamed through these blocks as data flows through the chain.

Launch the DataYoga Runner

The DataYoga Runner is a processing engine that runs Jobs. Jobs can either be Batch or Streaming, depending on the type of input block that is used as part of the Job. It provides:

  • Validation
  • Error handling
  • Metrics and observability
  • Credentials management

The Runtime supports multiple stream processing strategies including:

  • Stream processing
  • Parallelism
  • Buffering
  • Rate limit

It supports both async processing, multi-threading, and multi-processing to enable maximum throughput with a low footprint.

To deploy a job to the DataYoga Runner, use the DataYoga CLI.

datayoga run jobname

Tutorial - a Job that Reads from Redis and Writes to Postgres

Set up Environment

For the purpose of the tutorial, set up two containers.

  • A Redis source:
docker run -p 6379:6379 redis
  • A Postgres target:
docker run -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

Define Connections

DataYoga manages connections in a special file named connections.dy.yaml. Each connection is defined with a logical name and can define properties needed for the connection. Reference to environment variables, interpolation, and secrets is available.

Add the connections to Redis and Postgres above to the connections.dy.yaml:

cat << EOF > connections.dy.yaml
- name: hr
  type: postgresql
  host: localhost
  port: 5432
  database: postgres
  user: postgres
  password: postgres
- name: stream
  type: redis
  host: localhost
  port: 6379
EOF

Create the Job

cat << EOF > redis_to_pg.dy.yaml
steps:
- uses: redis.read_stream
  with:
    connection: stream
    stream_name: emp
- uses: add_field
  with:
    fields:
      - field: full_name
        language: jmespath
        expression: concat([fname, ' ' , lname])
- uses: map
  with:
    expression:
      {
        first_name: fname,
        last_name: lname,
        country: country_code || ' - ' || UPPER(country_name),
        full_name: full_name,
        greeting: "'Hello ' || CASE WHEN gender = 'F' THEN 'Ms.' WHEN gender = 'M' THEN 'Mr.' ELSE 'N/A' END || ' ' || full_name"
      }
    language: sql
- uses: relational.write
  with:
    connection: hr
    schema: public
    table: emp
    create: true
EOF

Run the Job in the DataYoga Runner

datayoga run redis_to_pg