Ingest records from Workers
You can send records to your Pipeline directly from a Cloudflare Worker. To do so, you need to:
- Create a Worker
- Create a Pipeline
- Add your Pipeline as a binding in your Workers'
wrangler.jsoncfile - Write your Worker, to send records to your Pipeline
- Deploy your Worker
- Verify in R2
Create a Cloudflare Worker if you don't already have one. This Worker will send records to your Pipeline.
To create a Worker, run the following command in a terminal:
npm create cloudflare@latest -- pipeline-workeryarn create cloudflare@latest pipeline-workerpnpm create cloudflare@latest pipeline-workerFor setup, select the following options:
- For What would you like to start with?, choose
Hello World example. - For Which template would you like to use?, choose
Hello World Worker. - For Which language do you want to use?, choose
TypeScript. - For Do you want to use git for version control?, choose
Yes. - For Do you want to deploy your application?, choose
No(we will be making some changes before deploying).
This will create a new directory, which will include both a src/index.ts Worker script, and a wrangler.jsonc configuration file. Navigate into the newly created directory:
cd pipeline-workerCreate a new Pipeline, if you don't already have one. If this is your first time using Pipelines, follow the instructions in the get started guide.
By default, Worker bindings are enabled on all Pipelines. Keep track of the name you gave your Pipeline in this stage; we'll use it in the next step.
To connect your Worker to your Pipeline, you need to create a binding. Bindings allow you to grant specific capabilities to your Worker.
Open your newly generated wrangler.jsonc configuration file and add the following:
[[pipelines]] binding = "MY_PIPELINE" pipeline = "<MY-PIPELINE-NAME>"{ "pipelines": [ { "binding": "MY_PIPELINE", "pipeline": "<MY-PIPELINE-NAME>" } ]}Replace <MY-PIPELINE-NAME> with the name of the Pipeline you created in step 2. Next, replace MY_PIPELINE with the name you want for your binding. The binding must be a valid JavaScript variable name. This is the variable you will use to reference this queue in your Worker.
You will now configure your Worker to send records to your Pipeline. Your Worker will:
- Take a request it receives from the browser
- Transform the request to JSON
- Send the resulting record to your Pipeline
In your Worker project directory, open the src folder and add the following to your index.ts file:
export interface Env { <MY_PIPELINE>: Pipeline<any>;}
export default { async fetch(req, env, ctx): Promise<Response> { let record = { url: req.url, method: req.method, headers: Object.fromEntries(req.headers) } await env.MY_PIPELINE.send([record]); return new Response('Success'); },} satisfies ExportedHandler<Env>;Replace MY_PIPELINE with the name of the binding you set in Step 3. If sending the record to the Pipeline fails, your Worker will return an error (raise an exception). If sending the record succeeds, it will return Success back with a HTTP 200 status code to the browser.
In a production application, you would likely use a try...catch ↗ statement to catch the exception and handle it directly (for example, return a custom error or even retry).
With your wrangler.jsonc file and index.ts file configured, you are ready to publish your Worker. To publish your Worker, run:
npx wrangler deployYou should see output that resembles the below, with a *.workers.dev URL by default.
Uploaded <YOUR-WORKER-NAME> (0.76 sec)Published <YOUR-WORKER-NAME> (0.29 sec) https://<YOUR-WORKER-NAME>.<YOUR-ACCOUNT>.workers.devCopy your *.workers.dev subdomain and paste it into a new browser tab. Refresh the page a few times to send records to your Pipeline. Your browser should return the Success response after sending the record to your Pipeline.
Go to the R2 bucket you created in step 2 via the Cloudflare dashboard ↗. You should see a prefix for today's date. Click through, and you'll find one or more files, containing the records you sent in step 4.